comparison mercurial/keepalive.py @ 2435:ff2bac730b99

http client: support persistent connections. uses keepalive module from urlgrabber package. tested against "hg serve", cgi server, and through http proxy. used ethereal to verify that only one tcp connection used during entire "hg pull" sequence. if server supports keepalive, this makes latency of "hg pull" much lower.
author Vadim Gelfer <vadim.gelfer@gmail.com>
date Thu, 15 Jun 2006 12:57:59 -0700
parents
children 5eb02f9ed804
comparison
equal deleted inserted replaced
2434:a2df85adface 2435:ff2bac730b99
1 # This library is free software; you can redistribute it and/or
2 # modify it under the terms of the GNU Lesser General Public
3 # License as published by the Free Software Foundation; either
4 # version 2.1 of the License, or (at your option) any later version.
5 #
6 # This library is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9 # Lesser General Public License for more details.
10 #
11 # You should have received a copy of the GNU Lesser General Public
12 # License along with this library; if not, write to the
13 # Free Software Foundation, Inc.,
14 # 59 Temple Place, Suite 330,
15 # Boston, MA 02111-1307 USA
16
17 # This file is part of urlgrabber, a high-level cross-protocol url-grabber
18 # Copyright 2002-2004 Michael D. Stenner, Ryan Tomayko
19
20 """An HTTP handler for urllib2 that supports HTTP 1.1 and keepalive.
21
22 >>> import urllib2
23 >>> from keepalive import HTTPHandler
24 >>> keepalive_handler = HTTPHandler()
25 >>> opener = urllib2.build_opener(keepalive_handler)
26 >>> urllib2.install_opener(opener)
27 >>>
28 >>> fo = urllib2.urlopen('http://www.python.org')
29
30 If a connection to a given host is requested, and all of the existing
31 connections are still in use, another connection will be opened. If
32 the handler tries to use an existing connection but it fails in some
33 way, it will be closed and removed from the pool.
34
35 To remove the handler, simply re-run build_opener with no arguments, and
36 install that opener.
37
38 You can explicitly close connections by using the close_connection()
39 method of the returned file-like object (described below) or you can
40 use the handler methods:
41
42 close_connection(host)
43 close_all()
44 open_connections()
45
46 NOTE: using the close_connection and close_all methods of the handler
47 should be done with care when using multiple threads.
48 * there is nothing that prevents another thread from creating new
49 connections immediately after connections are closed
50 * no checks are done to prevent in-use connections from being closed
51
52 >>> keepalive_handler.close_all()
53
54 EXTRA ATTRIBUTES AND METHODS
55
56 Upon a status of 200, the object returned has a few additional
57 attributes and methods, which should not be used if you want to
58 remain consistent with the normal urllib2-returned objects:
59
60 close_connection() - close the connection to the host
61 readlines() - you know, readlines()
62 status - the return status (ie 404)
63 reason - english translation of status (ie 'File not found')
64
65 If you want the best of both worlds, use this inside an
66 AttributeError-catching try:
67
68 >>> try: status = fo.status
69 >>> except AttributeError: status = None
70
71 Unfortunately, these are ONLY there if status == 200, so it's not
72 easy to distinguish between non-200 responses. The reason is that
73 urllib2 tries to do clever things with error codes 301, 302, 401,
74 and 407, and it wraps the object upon return.
75
76 For python versions earlier than 2.4, you can avoid this fancy error
77 handling by setting the module-level global HANDLE_ERRORS to zero.
78 You see, prior to 2.4, it's the HTTP Handler's job to determine what
79 to handle specially, and what to just pass up. HANDLE_ERRORS == 0
80 means "pass everything up". In python 2.4, however, this job no
81 longer belongs to the HTTP Handler and is now done by a NEW handler,
82 HTTPErrorProcessor. Here's the bottom line:
83
84 python version < 2.4
85 HANDLE_ERRORS == 1 (default) pass up 200, treat the rest as
86 errors
87 HANDLE_ERRORS == 0 pass everything up, error processing is
88 left to the calling code
89 python version >= 2.4
90 HANDLE_ERRORS == 1 pass up 200, treat the rest as errors
91 HANDLE_ERRORS == 0 (default) pass everything up, let the
92 other handlers (specifically,
93 HTTPErrorProcessor) decide what to do
94
95 In practice, setting the variable either way makes little difference
96 in python 2.4, so for the most consistent behavior across versions,
97 you probably just want to use the defaults, which will give you
98 exceptions on errors.
99
100 """
101
102 # $Id: keepalive.py,v 1.13 2005/10/22 21:57:28 mstenner Exp $
103
104 import urllib2
105 import httplib
106 import socket
107 import thread
108
109 DEBUG = None
110
111 import sys
112 if sys.version_info < (2, 4): HANDLE_ERRORS = 1
113 else: HANDLE_ERRORS = 0
114
115 class ConnectionManager:
116 """
117 The connection manager must be able to:
118 * keep track of all existing
119 """
120 def __init__(self):
121 self._lock = thread.allocate_lock()
122 self._hostmap = {} # map hosts to a list of connections
123 self._connmap = {} # map connections to host
124 self._readymap = {} # map connection to ready state
125
126 def add(self, host, connection, ready):
127 self._lock.acquire()
128 try:
129 if not self._hostmap.has_key(host): self._hostmap[host] = []
130 self._hostmap[host].append(connection)
131 self._connmap[connection] = host
132 self._readymap[connection] = ready
133 finally:
134 self._lock.release()
135
136 def remove(self, connection):
137 self._lock.acquire()
138 try:
139 try:
140 host = self._connmap[connection]
141 except KeyError:
142 pass
143 else:
144 del self._connmap[connection]
145 del self._readymap[connection]
146 self._hostmap[host].remove(connection)
147 if not self._hostmap[host]: del self._hostmap[host]
148 finally:
149 self._lock.release()
150
151 def set_ready(self, connection, ready):
152 try: self._readymap[connection] = ready
153 except KeyError: pass
154
155 def get_ready_conn(self, host):
156 conn = None
157 self._lock.acquire()
158 try:
159 if self._hostmap.has_key(host):
160 for c in self._hostmap[host]:
161 if self._readymap[c]:
162 self._readymap[c] = 0
163 conn = c
164 break
165 finally:
166 self._lock.release()
167 return conn
168
169 def get_all(self, host=None):
170 if host:
171 return list(self._hostmap.get(host, []))
172 else:
173 return dict(self._hostmap)
174
175 class HTTPHandler(urllib2.HTTPHandler):
176 def __init__(self):
177 self._cm = ConnectionManager()
178
179 #### Connection Management
180 def open_connections(self):
181 """return a list of connected hosts and the number of connections
182 to each. [('foo.com:80', 2), ('bar.org', 1)]"""
183 return [(host, len(li)) for (host, li) in self._cm.get_all().items()]
184
185 def close_connection(self, host):
186 """close connection(s) to <host>
187 host is the host:port spec, as in 'www.cnn.com:8080' as passed in.
188 no error occurs if there is no connection to that host."""
189 for h in self._cm.get_all(host):
190 self._cm.remove(h)
191 h.close()
192
193 def close_all(self):
194 """close all open connections"""
195 for host, conns in self._cm.get_all().items():
196 for h in conns:
197 self._cm.remove(h)
198 h.close()
199
200 def _request_closed(self, request, host, connection):
201 """tells us that this request is now closed and the the
202 connection is ready for another request"""
203 self._cm.set_ready(connection, 1)
204
205 def _remove_connection(self, host, connection, close=0):
206 if close: connection.close()
207 self._cm.remove(connection)
208
209 #### Transaction Execution
210 def http_open(self, req):
211 return self.do_open(HTTPConnection, req)
212
213 def do_open(self, http_class, req):
214 host = req.get_host()
215 if not host:
216 raise urllib2.URLError('no host given')
217
218 try:
219 h = self._cm.get_ready_conn(host)
220 while h:
221 r = self._reuse_connection(h, req, host)
222
223 # if this response is non-None, then it worked and we're
224 # done. Break out, skipping the else block.
225 if r: break
226
227 # connection is bad - possibly closed by server
228 # discard it and ask for the next free connection
229 h.close()
230 self._cm.remove(h)
231 h = self._cm.get_ready_conn(host)
232 else:
233 # no (working) free connections were found. Create a new one.
234 h = http_class(host)
235 if DEBUG: DEBUG.info("creating new connection to %s (%d)",
236 host, id(h))
237 self._cm.add(host, h, 0)
238 self._start_transaction(h, req)
239 r = h.getresponse()
240 except (socket.error, httplib.HTTPException), err:
241 raise urllib2.URLError(err)
242
243 # if not a persistent connection, don't try to reuse it
244 if r.will_close: self._cm.remove(h)
245
246 if DEBUG: DEBUG.info("STATUS: %s, %s", r.status, r.reason)
247 r._handler = self
248 r._host = host
249 r._url = req.get_full_url()
250 r._connection = h
251 r.code = r.status
252
253 if r.status == 200 or not HANDLE_ERRORS:
254 return r
255 else:
256 return self.parent.error('http', req, r, r.status, r.reason, r.msg)
257
258
259 def _reuse_connection(self, h, req, host):
260 """start the transaction with a re-used connection
261 return a response object (r) upon success or None on failure.
262 This DOES not close or remove bad connections in cases where
263 it returns. However, if an unexpected exception occurs, it
264 will close and remove the connection before re-raising.
265 """
266 try:
267 self._start_transaction(h, req)
268 r = h.getresponse()
269 # note: just because we got something back doesn't mean it
270 # worked. We'll check the version below, too.
271 except (socket.error, httplib.HTTPException):
272 r = None
273 except:
274 # adding this block just in case we've missed
275 # something we will still raise the exception, but
276 # lets try and close the connection and remove it
277 # first. We previously got into a nasty loop
278 # where an exception was uncaught, and so the
279 # connection stayed open. On the next try, the
280 # same exception was raised, etc. The tradeoff is
281 # that it's now possible this call will raise
282 # a DIFFERENT exception
283 if DEBUG: DEBUG.error("unexpected exception - closing " + \
284 "connection to %s (%d)", host, id(h))
285 self._cm.remove(h)
286 h.close()
287 raise
288
289 if r is None or r.version == 9:
290 # httplib falls back to assuming HTTP 0.9 if it gets a
291 # bad header back. This is most likely to happen if
292 # the socket has been closed by the server since we
293 # last used the connection.
294 if DEBUG: DEBUG.info("failed to re-use connection to %s (%d)",
295 host, id(h))
296 r = None
297 else:
298 if DEBUG: DEBUG.info("re-using connection to %s (%d)", host, id(h))
299
300 return r
301
302 def _start_transaction(self, h, req):
303 try:
304 if req.has_data():
305 data = req.get_data()
306 h.putrequest('POST', req.get_selector())
307 if not req.headers.has_key('Content-type'):
308 h.putheader('Content-type',
309 'application/x-www-form-urlencoded')
310 if not req.headers.has_key('Content-length'):
311 h.putheader('Content-length', '%d' % len(data))
312 else:
313 h.putrequest('GET', req.get_selector())
314 except (socket.error, httplib.HTTPException), err:
315 raise urllib2.URLError(err)
316
317 for args in self.parent.addheaders:
318 h.putheader(*args)
319 for k, v in req.headers.items():
320 h.putheader(k, v)
321 h.endheaders()
322 if req.has_data():
323 h.send(data)
324
325 class HTTPResponse(httplib.HTTPResponse):
326 # we need to subclass HTTPResponse in order to
327 # 1) add readline() and readlines() methods
328 # 2) add close_connection() methods
329 # 3) add info() and geturl() methods
330
331 # in order to add readline(), read must be modified to deal with a
332 # buffer. example: readline must read a buffer and then spit back
333 # one line at a time. The only real alternative is to read one
334 # BYTE at a time (ick). Once something has been read, it can't be
335 # put back (ok, maybe it can, but that's even uglier than this),
336 # so if you THEN do a normal read, you must first take stuff from
337 # the buffer.
338
339 # the read method wraps the original to accomodate buffering,
340 # although read() never adds to the buffer.
341 # Both readline and readlines have been stolen with almost no
342 # modification from socket.py
343
344
345 def __init__(self, sock, debuglevel=0, strict=0, method=None):
346 if method: # the httplib in python 2.3 uses the method arg
347 httplib.HTTPResponse.__init__(self, sock, debuglevel, method)
348 else: # 2.2 doesn't
349 httplib.HTTPResponse.__init__(self, sock, debuglevel)
350 self.fileno = sock.fileno
351 self.code = None
352 self._rbuf = ''
353 self._rbufsize = 8096
354 self._handler = None # inserted by the handler later
355 self._host = None # (same)
356 self._url = None # (same)
357 self._connection = None # (same)
358
359 _raw_read = httplib.HTTPResponse.read
360
361 def close(self):
362 if self.fp:
363 self.fp.close()
364 self.fp = None
365 if self._handler:
366 self._handler._request_closed(self, self._host,
367 self._connection)
368
369 def close_connection(self):
370 self._handler._remove_connection(self._host, self._connection, close=1)
371 self.close()
372
373 def info(self):
374 return self.msg
375
376 def geturl(self):
377 return self._url
378
379 def read(self, amt=None):
380 # the _rbuf test is only in this first if for speed. It's not
381 # logically necessary
382 if self._rbuf and not amt is None:
383 L = len(self._rbuf)
384 if amt > L:
385 amt -= L
386 else:
387 s = self._rbuf[:amt]
388 self._rbuf = self._rbuf[amt:]
389 return s
390
391 s = self._rbuf + self._raw_read(amt)
392 self._rbuf = ''
393 return s
394
395 def readline(self, limit=-1):
396 data = ""
397 i = self._rbuf.find('\n')
398 while i < 0 and not (0 < limit <= len(self._rbuf)):
399 new = self._raw_read(self._rbufsize)
400 if not new: break
401 i = new.find('\n')
402 if i >= 0: i = i + len(self._rbuf)
403 self._rbuf = self._rbuf + new
404 if i < 0: i = len(self._rbuf)
405 else: i = i+1
406 if 0 <= limit < len(self._rbuf): i = limit
407 data, self._rbuf = self._rbuf[:i], self._rbuf[i:]
408 return data
409
410 def readlines(self, sizehint = 0):
411 total = 0
412 list = []
413 while 1:
414 line = self.readline()
415 if not line: break
416 list.append(line)
417 total += len(line)
418 if sizehint and total >= sizehint:
419 break
420 return list
421
422
423 class HTTPConnection(httplib.HTTPConnection):
424 # use the modified response class
425 response_class = HTTPResponse
426
427 #########################################################################
428 ##### TEST FUNCTIONS
429 #########################################################################
430
431 def error_handler(url):
432 global HANDLE_ERRORS
433 orig = HANDLE_ERRORS
434 keepalive_handler = HTTPHandler()
435 opener = urllib2.build_opener(keepalive_handler)
436 urllib2.install_opener(opener)
437 pos = {0: 'off', 1: 'on'}
438 for i in (0, 1):
439 print " fancy error handling %s (HANDLE_ERRORS = %i)" % (pos[i], i)
440 HANDLE_ERRORS = i
441 try:
442 fo = urllib2.urlopen(url)
443 foo = fo.read()
444 fo.close()
445 try: status, reason = fo.status, fo.reason
446 except AttributeError: status, reason = None, None
447 except IOError, e:
448 print " EXCEPTION: %s" % e
449 raise
450 else:
451 print " status = %s, reason = %s" % (status, reason)
452 HANDLE_ERRORS = orig
453 hosts = keepalive_handler.open_connections()
454 print "open connections:", hosts
455 keepalive_handler.close_all()
456
457 def continuity(url):
458 import md5
459 format = '%25s: %s'
460
461 # first fetch the file with the normal http handler
462 opener = urllib2.build_opener()
463 urllib2.install_opener(opener)
464 fo = urllib2.urlopen(url)
465 foo = fo.read()
466 fo.close()
467 m = md5.new(foo)
468 print format % ('normal urllib', m.hexdigest())
469
470 # now install the keepalive handler and try again
471 opener = urllib2.build_opener(HTTPHandler())
472 urllib2.install_opener(opener)
473
474 fo = urllib2.urlopen(url)
475 foo = fo.read()
476 fo.close()
477 m = md5.new(foo)
478 print format % ('keepalive read', m.hexdigest())
479
480 fo = urllib2.urlopen(url)
481 foo = ''
482 while 1:
483 f = fo.readline()
484 if f: foo = foo + f
485 else: break
486 fo.close()
487 m = md5.new(foo)
488 print format % ('keepalive readline', m.hexdigest())
489
490 def comp(N, url):
491 print ' making %i connections to:\n %s' % (N, url)
492
493 sys.stdout.write(' first using the normal urllib handlers')
494 # first use normal opener
495 opener = urllib2.build_opener()
496 urllib2.install_opener(opener)
497 t1 = fetch(N, url)
498 print ' TIME: %.3f s' % t1
499
500 sys.stdout.write(' now using the keepalive handler ')
501 # now install the keepalive handler and try again
502 opener = urllib2.build_opener(HTTPHandler())
503 urllib2.install_opener(opener)
504 t2 = fetch(N, url)
505 print ' TIME: %.3f s' % t2
506 print ' improvement factor: %.2f' % (t1/t2, )
507
508 def fetch(N, url, delay=0):
509 import time
510 lens = []
511 starttime = time.time()
512 for i in range(N):
513 if delay and i > 0: time.sleep(delay)
514 fo = urllib2.urlopen(url)
515 foo = fo.read()
516 fo.close()
517 lens.append(len(foo))
518 diff = time.time() - starttime
519
520 j = 0
521 for i in lens[1:]:
522 j = j + 1
523 if not i == lens[0]:
524 print "WARNING: inconsistent length on read %i: %i" % (j, i)
525
526 return diff
527
528 def test_timeout(url):
529 global DEBUG
530 dbbackup = DEBUG
531 class FakeLogger:
532 def debug(self, msg, *args): print msg % args
533 info = warning = error = debug
534 DEBUG = FakeLogger()
535 print " fetching the file to establish a connection"
536 fo = urllib2.urlopen(url)
537 data1 = fo.read()
538 fo.close()
539
540 i = 20
541 print " waiting %i seconds for the server to close the connection" % i
542 while i > 0:
543 sys.stdout.write('\r %2i' % i)
544 sys.stdout.flush()
545 time.sleep(1)
546 i -= 1
547 sys.stderr.write('\r')
548
549 print " fetching the file a second time"
550 fo = urllib2.urlopen(url)
551 data2 = fo.read()
552 fo.close()
553
554 if data1 == data2:
555 print ' data are identical'
556 else:
557 print ' ERROR: DATA DIFFER'
558
559 DEBUG = dbbackup
560
561
562 def test(url, N=10):
563 print "checking error hander (do this on a non-200)"
564 try: error_handler(url)
565 except IOError, e:
566 print "exiting - exception will prevent further tests"
567 sys.exit()
568 print
569 print "performing continuity test (making sure stuff isn't corrupted)"
570 continuity(url)
571 print
572 print "performing speed comparison"
573 comp(N, url)
574 print
575 print "performing dropped-connection check"
576 test_timeout(url)
577
578 if __name__ == '__main__':
579 import time
580 import sys
581 try:
582 N = int(sys.argv[1])
583 url = sys.argv[2]
584 except:
585 print "%s <integer> <url>" % sys.argv[0]
586 else:
587 test(url, N)