mercurial/keepalive.py
changeset 2435 ff2bac730b99
child 2444 5eb02f9ed804
equal deleted inserted replaced
2434:a2df85adface 2435:ff2bac730b99
       
     1 #   This library is free software; you can redistribute it and/or
       
     2 #   modify it under the terms of the GNU Lesser General Public
       
     3 #   License as published by the Free Software Foundation; either
       
     4 #   version 2.1 of the License, or (at your option) any later version.
       
     5 #
       
     6 #   This library is distributed in the hope that it will be useful,
       
     7 #   but WITHOUT ANY WARRANTY; without even the implied warranty of
       
     8 #   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
       
     9 #   Lesser General Public License for more details.
       
    10 #
       
    11 #   You should have received a copy of the GNU Lesser General Public
       
    12 #   License along with this library; if not, write to the 
       
    13 #      Free Software Foundation, Inc., 
       
    14 #      59 Temple Place, Suite 330, 
       
    15 #      Boston, MA  02111-1307  USA
       
    16 
       
    17 # This file is part of urlgrabber, a high-level cross-protocol url-grabber
       
    18 # Copyright 2002-2004 Michael D. Stenner, Ryan Tomayko
       
    19 
       
    20 """An HTTP handler for urllib2 that supports HTTP 1.1 and keepalive.
       
    21 
       
    22 >>> import urllib2
       
    23 >>> from keepalive import HTTPHandler
       
    24 >>> keepalive_handler = HTTPHandler()
       
    25 >>> opener = urllib2.build_opener(keepalive_handler)
       
    26 >>> urllib2.install_opener(opener)
       
    27 >>> 
       
    28 >>> fo = urllib2.urlopen('http://www.python.org')
       
    29 
       
    30 If a connection to a given host is requested, and all of the existing
       
    31 connections are still in use, another connection will be opened.  If
       
    32 the handler tries to use an existing connection but it fails in some
       
    33 way, it will be closed and removed from the pool.
       
    34 
       
    35 To remove the handler, simply re-run build_opener with no arguments, and
       
    36 install that opener.
       
    37 
       
    38 You can explicitly close connections by using the close_connection()
       
    39 method of the returned file-like object (described below) or you can
       
    40 use the handler methods:
       
    41 
       
    42   close_connection(host)
       
    43   close_all()
       
    44   open_connections()
       
    45 
       
    46 NOTE: using the close_connection and close_all methods of the handler
       
    47 should be done with care when using multiple threads.
       
    48   * there is nothing that prevents another thread from creating new
       
    49     connections immediately after connections are closed
       
    50   * no checks are done to prevent in-use connections from being closed
       
    51 
       
    52 >>> keepalive_handler.close_all()
       
    53 
       
    54 EXTRA ATTRIBUTES AND METHODS
       
    55 
       
    56   Upon a status of 200, the object returned has a few additional
       
    57   attributes and methods, which should not be used if you want to
       
    58   remain consistent with the normal urllib2-returned objects:
       
    59 
       
    60     close_connection()  -  close the connection to the host
       
    61     readlines()         -  you know, readlines()
       
    62     status              -  the return status (ie 404)
       
    63     reason              -  english translation of status (ie 'File not found')
       
    64 
       
    65   If you want the best of both worlds, use this inside an
       
    66   AttributeError-catching try:
       
    67 
       
    68   >>> try: status = fo.status
       
    69   >>> except AttributeError: status = None
       
    70 
       
    71   Unfortunately, these are ONLY there if status == 200, so it's not
       
    72   easy to distinguish between non-200 responses.  The reason is that
       
    73   urllib2 tries to do clever things with error codes 301, 302, 401,
       
    74   and 407, and it wraps the object upon return.
       
    75 
       
    76   For python versions earlier than 2.4, you can avoid this fancy error
       
    77   handling by setting the module-level global HANDLE_ERRORS to zero.
       
    78   You see, prior to 2.4, it's the HTTP Handler's job to determine what
       
    79   to handle specially, and what to just pass up.  HANDLE_ERRORS == 0
       
    80   means "pass everything up".  In python 2.4, however, this job no
       
    81   longer belongs to the HTTP Handler and is now done by a NEW handler,
       
    82   HTTPErrorProcessor.  Here's the bottom line:
       
    83 
       
    84     python version < 2.4
       
    85         HANDLE_ERRORS == 1  (default) pass up 200, treat the rest as
       
    86                             errors
       
    87         HANDLE_ERRORS == 0  pass everything up, error processing is
       
    88                             left to the calling code
       
    89     python version >= 2.4
       
    90         HANDLE_ERRORS == 1  pass up 200, treat the rest as errors
       
    91         HANDLE_ERRORS == 0  (default) pass everything up, let the
       
    92                             other handlers (specifically,
       
    93                             HTTPErrorProcessor) decide what to do
       
    94 
       
    95   In practice, setting the variable either way makes little difference
       
    96   in python 2.4, so for the most consistent behavior across versions,
       
    97   you probably just want to use the defaults, which will give you
       
    98   exceptions on errors.
       
    99 
       
   100 """
       
   101 
       
   102 # $Id: keepalive.py,v 1.13 2005/10/22 21:57:28 mstenner Exp $
       
   103 
       
   104 import urllib2
       
   105 import httplib
       
   106 import socket
       
   107 import thread
       
   108 
       
   109 DEBUG = None
       
   110 
       
   111 import sys
       
   112 if sys.version_info < (2, 4): HANDLE_ERRORS = 1
       
   113 else: HANDLE_ERRORS = 0
       
   114     
       
   115 class ConnectionManager:
       
   116     """
       
   117     The connection manager must be able to:
       
   118       * keep track of all existing
       
   119       """
       
   120     def __init__(self):
       
   121         self._lock = thread.allocate_lock()
       
   122         self._hostmap = {} # map hosts to a list of connections
       
   123         self._connmap = {} # map connections to host
       
   124         self._readymap = {} # map connection to ready state
       
   125 
       
   126     def add(self, host, connection, ready):
       
   127         self._lock.acquire()
       
   128         try:
       
   129             if not self._hostmap.has_key(host): self._hostmap[host] = []
       
   130             self._hostmap[host].append(connection)
       
   131             self._connmap[connection] = host
       
   132             self._readymap[connection] = ready
       
   133         finally:
       
   134             self._lock.release()
       
   135 
       
   136     def remove(self, connection):
       
   137         self._lock.acquire()
       
   138         try:
       
   139             try:
       
   140                 host = self._connmap[connection]
       
   141             except KeyError:
       
   142                 pass
       
   143             else:
       
   144                 del self._connmap[connection]
       
   145                 del self._readymap[connection]
       
   146                 self._hostmap[host].remove(connection)
       
   147                 if not self._hostmap[host]: del self._hostmap[host]
       
   148         finally:
       
   149             self._lock.release()
       
   150 
       
   151     def set_ready(self, connection, ready):
       
   152         try: self._readymap[connection] = ready
       
   153         except KeyError: pass
       
   154         
       
   155     def get_ready_conn(self, host):
       
   156         conn = None
       
   157         self._lock.acquire()
       
   158         try:
       
   159             if self._hostmap.has_key(host):
       
   160                 for c in self._hostmap[host]:
       
   161                     if self._readymap[c]:
       
   162                         self._readymap[c] = 0
       
   163                         conn = c
       
   164                         break
       
   165         finally:
       
   166             self._lock.release()
       
   167         return conn
       
   168 
       
   169     def get_all(self, host=None):
       
   170         if host:
       
   171             return list(self._hostmap.get(host, []))
       
   172         else:
       
   173             return dict(self._hostmap)
       
   174 
       
   175 class HTTPHandler(urllib2.HTTPHandler):
       
   176     def __init__(self):
       
   177         self._cm = ConnectionManager()
       
   178         
       
   179     #### Connection Management
       
   180     def open_connections(self):
       
   181         """return a list of connected hosts and the number of connections
       
   182         to each.  [('foo.com:80', 2), ('bar.org', 1)]"""
       
   183         return [(host, len(li)) for (host, li) in self._cm.get_all().items()]
       
   184 
       
   185     def close_connection(self, host):
       
   186         """close connection(s) to <host>
       
   187         host is the host:port spec, as in 'www.cnn.com:8080' as passed in.
       
   188         no error occurs if there is no connection to that host."""
       
   189         for h in self._cm.get_all(host):
       
   190             self._cm.remove(h)
       
   191             h.close()
       
   192         
       
   193     def close_all(self):
       
   194         """close all open connections"""
       
   195         for host, conns in self._cm.get_all().items():
       
   196             for h in conns:
       
   197                 self._cm.remove(h)
       
   198                 h.close()
       
   199         
       
   200     def _request_closed(self, request, host, connection):
       
   201         """tells us that this request is now closed and the the
       
   202         connection is ready for another request"""
       
   203         self._cm.set_ready(connection, 1)
       
   204 
       
   205     def _remove_connection(self, host, connection, close=0):
       
   206         if close: connection.close()
       
   207         self._cm.remove(connection)
       
   208         
       
   209     #### Transaction Execution
       
   210     def http_open(self, req):
       
   211         return self.do_open(HTTPConnection, req)
       
   212 
       
   213     def do_open(self, http_class, req):
       
   214         host = req.get_host()
       
   215         if not host:
       
   216             raise urllib2.URLError('no host given')
       
   217 
       
   218         try:
       
   219             h = self._cm.get_ready_conn(host)
       
   220             while h:
       
   221                 r = self._reuse_connection(h, req, host)
       
   222 
       
   223                 # if this response is non-None, then it worked and we're
       
   224                 # done.  Break out, skipping the else block.
       
   225                 if r: break
       
   226 
       
   227                 # connection is bad - possibly closed by server
       
   228                 # discard it and ask for the next free connection
       
   229                 h.close()
       
   230                 self._cm.remove(h)
       
   231                 h = self._cm.get_ready_conn(host)
       
   232             else:
       
   233                 # no (working) free connections were found.  Create a new one.
       
   234                 h = http_class(host)
       
   235                 if DEBUG: DEBUG.info("creating new connection to %s (%d)",
       
   236                                      host, id(h))
       
   237                 self._cm.add(host, h, 0)
       
   238                 self._start_transaction(h, req)
       
   239                 r = h.getresponse()
       
   240         except (socket.error, httplib.HTTPException), err:
       
   241             raise urllib2.URLError(err)
       
   242             
       
   243         # if not a persistent connection, don't try to reuse it
       
   244         if r.will_close: self._cm.remove(h)
       
   245 
       
   246         if DEBUG: DEBUG.info("STATUS: %s, %s", r.status, r.reason)
       
   247         r._handler = self
       
   248         r._host = host
       
   249         r._url = req.get_full_url()
       
   250         r._connection = h
       
   251         r.code = r.status
       
   252         
       
   253         if r.status == 200 or not HANDLE_ERRORS:
       
   254             return r
       
   255         else:
       
   256             return self.parent.error('http', req, r, r.status, r.reason, r.msg)
       
   257 
       
   258 
       
   259     def _reuse_connection(self, h, req, host):
       
   260         """start the transaction with a re-used connection
       
   261         return a response object (r) upon success or None on failure.
       
   262         This DOES not close or remove bad connections in cases where
       
   263         it returns.  However, if an unexpected exception occurs, it
       
   264         will close and remove the connection before re-raising.
       
   265         """
       
   266         try:
       
   267             self._start_transaction(h, req)
       
   268             r = h.getresponse()
       
   269             # note: just because we got something back doesn't mean it
       
   270             # worked.  We'll check the version below, too.
       
   271         except (socket.error, httplib.HTTPException):
       
   272             r = None
       
   273         except:
       
   274             # adding this block just in case we've missed
       
   275             # something we will still raise the exception, but
       
   276             # lets try and close the connection and remove it
       
   277             # first.  We previously got into a nasty loop
       
   278             # where an exception was uncaught, and so the
       
   279             # connection stayed open.  On the next try, the
       
   280             # same exception was raised, etc.  The tradeoff is
       
   281             # that it's now possible this call will raise
       
   282             # a DIFFERENT exception
       
   283             if DEBUG: DEBUG.error("unexpected exception - closing " + \
       
   284                                   "connection to %s (%d)", host, id(h))
       
   285             self._cm.remove(h)
       
   286             h.close()
       
   287             raise
       
   288                     
       
   289         if r is None or r.version == 9:
       
   290             # httplib falls back to assuming HTTP 0.9 if it gets a
       
   291             # bad header back.  This is most likely to happen if
       
   292             # the socket has been closed by the server since we
       
   293             # last used the connection.
       
   294             if DEBUG: DEBUG.info("failed to re-use connection to %s (%d)",
       
   295                                  host, id(h))
       
   296             r = None
       
   297         else:
       
   298             if DEBUG: DEBUG.info("re-using connection to %s (%d)", host, id(h))
       
   299 
       
   300         return r
       
   301 
       
   302     def _start_transaction(self, h, req):
       
   303         try:
       
   304             if req.has_data():
       
   305                 data = req.get_data()
       
   306                 h.putrequest('POST', req.get_selector())
       
   307                 if not req.headers.has_key('Content-type'):
       
   308                     h.putheader('Content-type',
       
   309                                 'application/x-www-form-urlencoded')
       
   310                 if not req.headers.has_key('Content-length'):
       
   311                     h.putheader('Content-length', '%d' % len(data))
       
   312             else:
       
   313                 h.putrequest('GET', req.get_selector())
       
   314         except (socket.error, httplib.HTTPException), err:
       
   315             raise urllib2.URLError(err)
       
   316 
       
   317         for args in self.parent.addheaders:
       
   318             h.putheader(*args)
       
   319         for k, v in req.headers.items():
       
   320             h.putheader(k, v)
       
   321         h.endheaders()
       
   322         if req.has_data():
       
   323             h.send(data)
       
   324 
       
   325 class HTTPResponse(httplib.HTTPResponse):
       
   326     # we need to subclass HTTPResponse in order to
       
   327     # 1) add readline() and readlines() methods
       
   328     # 2) add close_connection() methods
       
   329     # 3) add info() and geturl() methods
       
   330 
       
   331     # in order to add readline(), read must be modified to deal with a
       
   332     # buffer.  example: readline must read a buffer and then spit back
       
   333     # one line at a time.  The only real alternative is to read one
       
   334     # BYTE at a time (ick).  Once something has been read, it can't be
       
   335     # put back (ok, maybe it can, but that's even uglier than this),
       
   336     # so if you THEN do a normal read, you must first take stuff from
       
   337     # the buffer.
       
   338 
       
   339     # the read method wraps the original to accomodate buffering,
       
   340     # although read() never adds to the buffer.
       
   341     # Both readline and readlines have been stolen with almost no
       
   342     # modification from socket.py
       
   343     
       
   344 
       
   345     def __init__(self, sock, debuglevel=0, strict=0, method=None):
       
   346         if method: # the httplib in python 2.3 uses the method arg
       
   347             httplib.HTTPResponse.__init__(self, sock, debuglevel, method)
       
   348         else: # 2.2 doesn't
       
   349             httplib.HTTPResponse.__init__(self, sock, debuglevel)
       
   350         self.fileno = sock.fileno
       
   351         self.code = None
       
   352         self._rbuf = ''
       
   353         self._rbufsize = 8096
       
   354         self._handler = None # inserted by the handler later
       
   355         self._host = None    # (same)
       
   356         self._url = None     # (same)
       
   357         self._connection = None # (same)
       
   358 
       
   359     _raw_read = httplib.HTTPResponse.read
       
   360 
       
   361     def close(self):
       
   362         if self.fp:
       
   363             self.fp.close()
       
   364             self.fp = None
       
   365             if self._handler:
       
   366                 self._handler._request_closed(self, self._host,
       
   367                                               self._connection)
       
   368 
       
   369     def close_connection(self):
       
   370         self._handler._remove_connection(self._host, self._connection, close=1)
       
   371         self.close()
       
   372         
       
   373     def info(self):
       
   374         return self.msg
       
   375 
       
   376     def geturl(self):
       
   377         return self._url
       
   378 
       
   379     def read(self, amt=None):
       
   380         # the _rbuf test is only in this first if for speed.  It's not
       
   381         # logically necessary
       
   382         if self._rbuf and not amt is None:
       
   383             L = len(self._rbuf)
       
   384             if amt > L:
       
   385                 amt -= L
       
   386             else:
       
   387                 s = self._rbuf[:amt]
       
   388                 self._rbuf = self._rbuf[amt:]
       
   389                 return s
       
   390 
       
   391         s = self._rbuf + self._raw_read(amt)
       
   392         self._rbuf = ''
       
   393         return s
       
   394 
       
   395     def readline(self, limit=-1):
       
   396         data = ""
       
   397         i = self._rbuf.find('\n')
       
   398         while i < 0 and not (0 < limit <= len(self._rbuf)):
       
   399             new = self._raw_read(self._rbufsize)
       
   400             if not new: break
       
   401             i = new.find('\n')
       
   402             if i >= 0: i = i + len(self._rbuf)
       
   403             self._rbuf = self._rbuf + new
       
   404         if i < 0: i = len(self._rbuf)
       
   405         else: i = i+1
       
   406         if 0 <= limit < len(self._rbuf): i = limit
       
   407         data, self._rbuf = self._rbuf[:i], self._rbuf[i:]
       
   408         return data
       
   409 
       
   410     def readlines(self, sizehint = 0):
       
   411         total = 0
       
   412         list = []
       
   413         while 1:
       
   414             line = self.readline()
       
   415             if not line: break
       
   416             list.append(line)
       
   417             total += len(line)
       
   418             if sizehint and total >= sizehint:
       
   419                 break
       
   420         return list
       
   421 
       
   422 
       
   423 class HTTPConnection(httplib.HTTPConnection):
       
   424     # use the modified response class
       
   425     response_class = HTTPResponse
       
   426     
       
   427 #########################################################################
       
   428 #####   TEST FUNCTIONS
       
   429 #########################################################################
       
   430 
       
   431 def error_handler(url):
       
   432     global HANDLE_ERRORS
       
   433     orig = HANDLE_ERRORS
       
   434     keepalive_handler = HTTPHandler()
       
   435     opener = urllib2.build_opener(keepalive_handler)
       
   436     urllib2.install_opener(opener)
       
   437     pos = {0: 'off', 1: 'on'}
       
   438     for i in (0, 1):
       
   439         print "  fancy error handling %s (HANDLE_ERRORS = %i)" % (pos[i], i)
       
   440         HANDLE_ERRORS = i
       
   441         try:
       
   442             fo = urllib2.urlopen(url)
       
   443             foo = fo.read()
       
   444             fo.close()
       
   445             try: status, reason = fo.status, fo.reason
       
   446             except AttributeError: status, reason = None, None
       
   447         except IOError, e:
       
   448             print "  EXCEPTION: %s" % e
       
   449             raise
       
   450         else:
       
   451             print "  status = %s, reason = %s" % (status, reason)
       
   452     HANDLE_ERRORS = orig
       
   453     hosts = keepalive_handler.open_connections()
       
   454     print "open connections:", hosts
       
   455     keepalive_handler.close_all()
       
   456 
       
   457 def continuity(url):
       
   458     import md5
       
   459     format = '%25s: %s'
       
   460     
       
   461     # first fetch the file with the normal http handler
       
   462     opener = urllib2.build_opener()
       
   463     urllib2.install_opener(opener)
       
   464     fo = urllib2.urlopen(url)
       
   465     foo = fo.read()
       
   466     fo.close()
       
   467     m = md5.new(foo)
       
   468     print format % ('normal urllib', m.hexdigest())
       
   469 
       
   470     # now install the keepalive handler and try again
       
   471     opener = urllib2.build_opener(HTTPHandler())
       
   472     urllib2.install_opener(opener)
       
   473 
       
   474     fo = urllib2.urlopen(url)
       
   475     foo = fo.read()
       
   476     fo.close()
       
   477     m = md5.new(foo)
       
   478     print format % ('keepalive read', m.hexdigest())
       
   479 
       
   480     fo = urllib2.urlopen(url)
       
   481     foo = ''
       
   482     while 1:
       
   483         f = fo.readline()
       
   484         if f: foo = foo + f
       
   485         else: break
       
   486     fo.close()
       
   487     m = md5.new(foo)
       
   488     print format % ('keepalive readline', m.hexdigest())
       
   489 
       
   490 def comp(N, url):
       
   491     print '  making %i connections to:\n  %s' % (N, url)
       
   492 
       
   493     sys.stdout.write('  first using the normal urllib handlers')
       
   494     # first use normal opener
       
   495     opener = urllib2.build_opener()
       
   496     urllib2.install_opener(opener)
       
   497     t1 = fetch(N, url)
       
   498     print '  TIME: %.3f s' % t1
       
   499 
       
   500     sys.stdout.write('  now using the keepalive handler       ')
       
   501     # now install the keepalive handler and try again
       
   502     opener = urllib2.build_opener(HTTPHandler())
       
   503     urllib2.install_opener(opener)
       
   504     t2 = fetch(N, url)
       
   505     print '  TIME: %.3f s' % t2
       
   506     print '  improvement factor: %.2f' % (t1/t2, )
       
   507     
       
   508 def fetch(N, url, delay=0):
       
   509     import time
       
   510     lens = []
       
   511     starttime = time.time()
       
   512     for i in range(N):
       
   513         if delay and i > 0: time.sleep(delay)
       
   514         fo = urllib2.urlopen(url)
       
   515         foo = fo.read()
       
   516         fo.close()
       
   517         lens.append(len(foo))
       
   518     diff = time.time() - starttime
       
   519 
       
   520     j = 0
       
   521     for i in lens[1:]:
       
   522         j = j + 1
       
   523         if not i == lens[0]:
       
   524             print "WARNING: inconsistent length on read %i: %i" % (j, i)
       
   525 
       
   526     return diff
       
   527 
       
   528 def test_timeout(url):
       
   529     global DEBUG
       
   530     dbbackup = DEBUG
       
   531     class FakeLogger:
       
   532         def debug(self, msg, *args): print msg % args
       
   533         info = warning = error = debug
       
   534     DEBUG = FakeLogger()
       
   535     print "  fetching the file to establish a connection"
       
   536     fo = urllib2.urlopen(url)
       
   537     data1 = fo.read()
       
   538     fo.close()
       
   539  
       
   540     i = 20
       
   541     print "  waiting %i seconds for the server to close the connection" % i
       
   542     while i > 0:
       
   543         sys.stdout.write('\r  %2i' % i)
       
   544         sys.stdout.flush()
       
   545         time.sleep(1)
       
   546         i -= 1
       
   547     sys.stderr.write('\r')
       
   548 
       
   549     print "  fetching the file a second time"
       
   550     fo = urllib2.urlopen(url)
       
   551     data2 = fo.read()
       
   552     fo.close()
       
   553 
       
   554     if data1 == data2:
       
   555         print '  data are identical'
       
   556     else:
       
   557         print '  ERROR: DATA DIFFER'
       
   558 
       
   559     DEBUG = dbbackup
       
   560 
       
   561     
       
   562 def test(url, N=10):
       
   563     print "checking error hander (do this on a non-200)"
       
   564     try: error_handler(url)
       
   565     except IOError, e:
       
   566         print "exiting - exception will prevent further tests"
       
   567         sys.exit()
       
   568     print
       
   569     print "performing continuity test (making sure stuff isn't corrupted)"
       
   570     continuity(url)
       
   571     print
       
   572     print "performing speed comparison"
       
   573     comp(N, url)
       
   574     print
       
   575     print "performing dropped-connection check"
       
   576     test_timeout(url)
       
   577     
       
   578 if __name__ == '__main__':
       
   579     import time
       
   580     import sys
       
   581     try:
       
   582         N = int(sys.argv[1])
       
   583         url = sys.argv[2]
       
   584     except:
       
   585         print "%s <integer> <url>" % sys.argv[0]
       
   586     else:
       
   587         test(url, N)