# HG changeset patch # User Vadim Gelfer # Date 1152901042 25200 # Node ID ffb895f16925dc6248479b89cc7f77628dee7da9 # Parent 1b4eb1f924330227e33e462ee5e508fd8775432b add support for streaming clone. existing clone code uses pull to get changes from remote repo. is very slow, uses lots of memory and cpu. new clone code has server write file data straight to client, client writes file data straight to disk. memory and cpu used are very low, clone is much faster over lan. new client can still clone with pull, can still clone from older servers. new server can still serve older clients. diff --git a/mercurial/hg.py b/mercurial/hg.py --- a/mercurial/hg.py +++ b/mercurial/hg.py @@ -179,7 +179,7 @@ def clone(ui, source, dest=None, pull=Fa revs = [src_repo.lookup(r) for r in rev] if dest_repo.local(): - dest_repo.pull(src_repo, heads=revs) + dest_repo.clone(src_repo, heads=revs, pull=pull) elif src_repo.local(): src_repo.push(dest_repo, revs=revs) else: diff --git a/mercurial/hgweb/hgweb_mod.py b/mercurial/hgweb/hgweb_mod.py --- a/mercurial/hgweb/hgweb_mod.py +++ b/mercurial/hgweb/hgweb_mod.py @@ -11,7 +11,8 @@ import os.path import mimetypes from mercurial.demandload import demandload demandload(globals(), "re zlib ConfigParser mimetools cStringIO sys tempfile") -demandload(globals(), "mercurial:mdiff,ui,hg,util,archival,templater") +demandload(globals(), "mercurial:mdiff,ui,hg,util,archival,streamclone") +demandload(globals(), "mercurial:templater") demandload(globals(), "mercurial.hgweb.common:get_mtime,staticfile") from mercurial.node import * from mercurial.i18n import gettext as _ @@ -859,7 +860,7 @@ class hgweb(object): or self.t("error", error="%r not found" % fname)) def do_capabilities(self, req): - resp = 'unbundle' + resp = 'unbundle stream=%d' % (self.repo.revlogversion,) req.httphdr("application/mercurial-0.1", length=len(resp)) req.write(resp) @@ -950,3 +951,7 @@ class hgweb(object): finally: fp.close() os.unlink(tempname) + + def do_stream_out(self, req): + req.httphdr("application/mercurial-0.1") + streamclone.stream_out(self.repo, req) diff --git a/mercurial/httprepo.py b/mercurial/httprepo.py --- a/mercurial/httprepo.py +++ b/mercurial/httprepo.py @@ -326,6 +326,9 @@ class httprepository(remoterepository): fp.close() os.unlink(tempname) + def stream_out(self): + return self.do_cmd('stream_out') + class httpsrepository(httprepository): def __init__(self, ui, path): if not has_https: diff --git a/mercurial/localrepo.py b/mercurial/localrepo.py --- a/mercurial/localrepo.py +++ b/mercurial/localrepo.py @@ -8,17 +8,19 @@ from node import * from i18n import gettext as _ from demandload import * +import repo demandload(globals(), "appendfile changegroup") -demandload(globals(), "changelog dirstate filelog manifest repo context") +demandload(globals(), "changelog dirstate filelog manifest context") demandload(globals(), "re lock transaction tempfile stat mdiff errno ui") -demandload(globals(), "os revlog util") +demandload(globals(), "os revlog time util") -class localrepository(object): +class localrepository(repo.repository): capabilities = () def __del__(self): self.transhandle = None def __init__(self, parentui, path=None, create=0): + repo.repository.__init__(self) if not path: p = os.getcwd() while not os.path.isdir(os.path.join(p, ".hg")): @@ -1183,7 +1185,7 @@ class localrepository(object): # unbundle assumes local user cannot lock remote repo (new ssh # servers, http servers). - if 'unbundle' in remote.capabilities: + if remote.capable('unbundle'): return self.push_unbundle(remote, force, revs) return self.push_addchangegroup(remote, force, revs) @@ -2201,6 +2203,47 @@ class localrepository(object): self.ui.warn(_("%d integrity errors encountered!\n") % errors[0]) return 1 + def stream_in(self, remote): + self.ui.status(_('streaming all changes\n')) + fp = remote.stream_out() + total_files, total_bytes = map(int, fp.readline().split(' ', 1)) + self.ui.status(_('%d files to transfer, %s of data\n') % + (total_files, util.bytecount(total_bytes))) + start = time.time() + for i in xrange(total_files): + name, size = fp.readline().split('\0', 1) + size = int(size) + self.ui.debug('adding %s (%s)\n' % (name, util.bytecount(size))) + ofp = self.opener(name, 'w') + for chunk in util.filechunkiter(fp, limit=size): + ofp.write(chunk) + ofp.close() + elapsed = time.time() - start + self.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % + (util.bytecount(total_bytes), elapsed, + util.bytecount(total_bytes / elapsed))) + self.reload() + return len(self.heads()) + 1 + + def clone(self, remote, heads=[], pull=False): + '''clone remote repository. + if possible, changes are streamed from remote server. + + keyword arguments: + heads: list of revs to clone (forces use of pull) + pull: force use of pull, even if remote can stream''' + + # now, all clients that can stream can read repo formats + # supported by all servers that can stream. + + # if revlog format changes, client will have to check version + # and format flags on "stream" capability, and stream only if + # compatible. + + if not pull and not heads and remote.capable('stream'): + return self.stream_in(remote) + return self.pull(remote, heads) + # used to avoid circular references so destructors work def aftertrans(base): p = base diff --git a/mercurial/remoterepo.py b/mercurial/remoterepo.py --- a/mercurial/remoterepo.py +++ b/mercurial/remoterepo.py @@ -5,7 +5,9 @@ # This software may be used and distributed according to the terms # of the GNU General Public License, incorporated herein by reference. -class remoterepository(object): +import repo + +class remoterepository(repo.repository): def dev(self): return -1 diff --git a/mercurial/repo.py b/mercurial/repo.py --- a/mercurial/repo.py +++ b/mercurial/repo.py @@ -5,4 +5,19 @@ # This software may be used and distributed according to the terms # of the GNU General Public License, incorporated herein by reference. -class RepoError(Exception): pass +class RepoError(Exception): + pass + +class repository(object): + def capable(self, name): + '''tell whether repo supports named capability. + return False if not supported. + if boolean capability, return True. + if string capability, return string.''' + name_eq = name + '=' + for cap in self.capabilities: + if name == cap: + return True + if cap.startswith(name_eq): + return cap[len(name_eq):] + return False diff --git a/mercurial/sshrepo.py b/mercurial/sshrepo.py --- a/mercurial/sshrepo.py +++ b/mercurial/sshrepo.py @@ -198,3 +198,6 @@ class sshrepository(remoterepository): if not r: return 1 return int(r) + + def stream_out(self): + return self.do_cmd('stream_out') diff --git a/mercurial/sshserver.py b/mercurial/sshserver.py --- a/mercurial/sshserver.py +++ b/mercurial/sshserver.py @@ -8,7 +8,7 @@ from demandload import demandload from i18n import gettext as _ from node import * -demandload(globals(), "os sys tempfile util") +demandload(globals(), "os streamclone sys tempfile util") class sshserver(object): def __init__(self, ui, repo): @@ -60,7 +60,7 @@ class sshserver(object): capabilities: space separated list of tokens ''' - r = "capabilities: unbundle\n" + r = "capabilities: unbundle stream=%d\n" % (self.repo.revlogversion,) self.respond(r) def do_lock(self): @@ -167,3 +167,5 @@ class sshserver(object): fp.close() os.unlink(tempname) + def do_stream_out(self): + streamclone.stream_out(self.repo, self.fout) diff --git a/mercurial/streamclone.py b/mercurial/streamclone.py new file mode 100644 --- /dev/null +++ b/mercurial/streamclone.py @@ -0,0 +1,82 @@ +# streamclone.py - streaming clone server support for mercurial +# +# Copyright 2006 Vadim Gelfer +# +# This software may be used and distributed according to the terms +# of the GNU General Public License, incorporated herein by reference. + +from demandload import demandload +from i18n import gettext as _ +demandload(globals(), "os stat util") + +# if server supports streaming clone, it advertises "stream" +# capability with value that is version+flags of repo it is serving. +# client only streams if it can read that repo format. + +def walkrepo(root): + '''iterate over metadata files in repository. + walk in natural (sorted) order. + yields 2-tuples: name of .d or .i file, size of file.''' + + strip_count = len(root) + len(os.sep) + def walk(path, recurse): + ents = os.listdir(path) + ents.sort() + for e in ents: + pe = os.path.join(path, e) + st = os.lstat(pe) + if stat.S_ISDIR(st.st_mode): + if recurse: + for x in walk(pe, True): + yield x + else: + if not stat.S_ISREG(st.st_mode) or len(e) < 2: + continue + sfx = e[-2:] + if sfx in ('.d', '.i'): + yield pe[strip_count:], st.st_size + # write file data first + for x in walk(os.path.join(root, 'data'), True): + yield x + # write manifest before changelog + meta = list(walk(root, False)) + meta.sort(reverse=True) + for x in meta: + yield x + +# stream file format is simple. +# +# server writes out line that says how many files, how many total +# bytes. separator is ascii space, byte counts are strings. +# +# then for each file: +# +# server writes out line that says file name, how many bytes in +# file. separator is ascii nul, byte count is string. +# +# server writes out raw file data. + +def stream_out(repo, fileobj): + '''stream out all metadata files in repository. + writes to file-like object, must support write() and optional flush().''' + # get consistent snapshot of repo. lock during scan so lock not + # needed while we stream, and commits can happen. + lock = repo.lock() + repo.ui.debug('scanning\n') + entries = [] + total_bytes = 0 + for name, size in walkrepo(repo.path): + entries.append((name, size)) + total_bytes += size + lock.release() + + repo.ui.debug('%d files, %d bytes to transfer\n' % + (len(entries), total_bytes)) + fileobj.write('%d %d\n' % (len(entries), total_bytes)) + for name, size in entries: + repo.ui.debug('sending %s (%d bytes)\n' % (name, size)) + fileobj.write('%s\0%d\n' % (name, size)) + for chunk in util.filechunkiter(repo.opener(name), limit=size): + fileobj.write(chunk) + flush = getattr(fileobj, 'flush', None) + if flush: flush() diff --git a/mercurial/util.py b/mercurial/util.py --- a/mercurial/util.py +++ b/mercurial/util.py @@ -961,3 +961,24 @@ def rcpath(): else: _rcpath = os_rcpath() return _rcpath + +def bytecount(nbytes): + '''return byte count formatted as readable string, with units''' + + units = ( + (100, 1<<30, _('%.0f GB')), + (10, 1<<30, _('%.1f GB')), + (1, 1<<30, _('%.2f GB')), + (100, 1<<20, _('%.0f MB')), + (10, 1<<20, _('%.1f MB')), + (1, 1<<20, _('%.2f MB')), + (100, 1<<10, _('%.0f KB')), + (10, 1<<10, _('%.1f KB')), + (1, 1<<10, _('%.2f KB')), + (1, 1, _('%.0f bytes')), + ) + + for multiplier, divisor, format in units: + if nbytes >= divisor * multiplier: + return format % (nbytes / float(divisor)) + return units[-1][2] % nbytes diff --git a/tests/test-http b/tests/test-http new file mode 100755 --- /dev/null +++ b/tests/test-http @@ -0,0 +1,25 @@ +#!/bin/sh + +mkdir test +cd test +echo foo>foo +hg init +hg addremove +hg commit -m 1 +hg verify +hg serve -p 20059 -d --pid-file=hg.pid +cat hg.pid >> $DAEMON_PIDS +cd .. + +echo % clone via stream +http_proxy= hg clone http://localhost:20059/ copy 2>&1 | \ + sed -e 's/[0-9][0-9.]*/XXX/g' +cd copy +hg verify + +cd .. + +echo % clone via pull +http_proxy= hg clone --pull http://localhost:20059/ copy-pull +cd copy-pull +hg verify diff --git a/tests/test-http-proxy b/tests/test-http-proxy --- a/tests/test-http-proxy +++ b/tests/test-http-proxy @@ -13,17 +13,27 @@ echo $! > proxy.pid) cat proxy.pid >> $DAEMON_PIDS sleep 2 -echo %% url for proxy -http_proxy=http://localhost:20060/ hg --config http_proxy.always=True clone http://localhost:20059/ b +echo %% url for proxy, stream +http_proxy=http://localhost:20060/ hg --config http_proxy.always=True clone http://localhost:20059/ b | \ + sed -e 's/[0-9][0-9.]*/XXX/g' +cd b +hg verify +cd .. + +echo %% url for proxy, pull +http_proxy=http://localhost:20060/ hg --config http_proxy.always=True clone --pull http://localhost:20059/ b-pull +cd b-pull +hg verify +cd .. echo %% host:port for proxy -http_proxy=localhost:20060 hg clone --config http_proxy.always=True http://localhost:20059/ c +http_proxy=localhost:20060 hg clone --pull --config http_proxy.always=True http://localhost:20059/ c echo %% proxy url with user name and password -http_proxy=http://user:passwd@localhost:20060 hg clone --config http_proxy.always=True http://localhost:20059/ d +http_proxy=http://user:passwd@localhost:20060 hg clone --pull --config http_proxy.always=True http://localhost:20059/ d echo %% url with user name and password -http_proxy=http://user:passwd@localhost:20060 hg clone --config http_proxy.always=True http://user:passwd@localhost:20059/ e +http_proxy=http://user:passwd@localhost:20060 hg clone --pull --config http_proxy.always=True http://user:passwd@localhost:20059/ e echo %% bad host:port for proxy http_proxy=localhost:20061 hg clone --config http_proxy.always=True http://localhost:20059/ f diff --git a/tests/test-http-proxy.out b/tests/test-http-proxy.out --- a/tests/test-http-proxy.out +++ b/tests/test-http-proxy.out @@ -1,11 +1,26 @@ adding a -%% url for proxy +%% url for proxy, stream +streaming all changes +XXX files to transfer, XXX bytes of data +transferred XXX bytes in XXX seconds (XXX KB/sec) +XXX files updated, XXX files merged, XXX files removed, XXX files unresolved +checking changesets +checking manifests +crosschecking files in changesets and manifests +checking files +1 files, 1 changesets, 1 total revisions +%% url for proxy, pull requesting all changes adding changesets adding manifests adding file changes added 1 changesets with 1 changes to 1 files 1 files updated, 0 files merged, 0 files removed, 0 files unresolved +checking changesets +checking manifests +crosschecking files in changesets and manifests +checking files +1 files, 1 changesets, 1 total revisions %% host:port for proxy requesting all changes adding changesets diff --git a/tests/test-http.out b/tests/test-http.out new file mode 100644 --- /dev/null +++ b/tests/test-http.out @@ -0,0 +1,29 @@ +(the addremove command is deprecated; use add and remove --after instead) +adding foo +checking changesets +checking manifests +crosschecking files in changesets and manifests +checking files +1 files, 1 changesets, 1 total revisions +% clone via stream +streaming all changes +XXX files to transfer, XXX bytes of data +transferred XXX bytes in XXX seconds (XXX KB/sec) +XXX files updated, XXX files merged, XXX files removed, XXX files unresolved +checking changesets +checking manifests +crosschecking files in changesets and manifests +checking files +1 files, 1 changesets, 1 total revisions +% clone via pull +requesting all changes +adding changesets +adding manifests +adding file changes +added 1 changesets with 1 changes to 1 files +1 files updated, 0 files merged, 0 files removed, 0 files unresolved +checking changesets +checking manifests +crosschecking files in changesets and manifests +checking files +1 files, 1 changesets, 1 total revisions diff --git a/tests/test-pull b/tests/test-pull --- a/tests/test-pull +++ b/tests/test-pull @@ -11,7 +11,7 @@ hg serve -p 20059 -d --pid-file=hg.pid cat hg.pid >> $DAEMON_PIDS cd .. -http_proxy= hg clone http://localhost:20059/ copy +http_proxy= hg clone --pull http://localhost:20059/ copy cd copy hg verify hg co diff --git a/tests/test-ssh b/tests/test-ssh --- a/tests/test-ssh +++ b/tests/test-ssh @@ -30,8 +30,15 @@ hg ci -A -m "init" -d "1000000 0" foo cd .. -echo "# clone remote" -hg clone -e ./dummyssh ssh://user@dummy/remote local +echo "# clone remote via stream" +hg clone -e ./dummyssh ssh://user@dummy/remote local-stream 2>&1 | \ + sed -e 's/[0-9][0-9.]*/XXX/g' +cd local-stream +hg verify +cd .. + +echo "# clone remote via pull" +hg clone -e ./dummyssh --pull ssh://user@dummy/remote local echo "# verify" cd local diff --git a/tests/test-ssh.out b/tests/test-ssh.out --- a/tests/test-ssh.out +++ b/tests/test-ssh.out @@ -1,5 +1,15 @@ # creating 'remote' -# clone remote +# clone remote via stream +streaming all changes +XXX files to transfer, XXX bytes of data +transferred XXX bytes in XXX seconds (XXX KB/sec) +XXX files updated, XXX files merged, XXX files removed, XXX files unresolved +checking changesets +checking manifests +crosschecking files in changesets and manifests +checking files +1 files, 1 changesets, 1 total revisions +# clone remote via pull requesting all changes adding changesets adding manifests @@ -70,6 +80,7 @@ remote: added 1 changesets with 1 change Got arguments 1:user@dummy 2:hg -R remote serve --stdio 3: 4: 5: Got arguments 1:user@dummy 2:hg -R remote serve --stdio 3: 4: 5: Got arguments 1:user@dummy 2:hg -R remote serve --stdio 3: 4: 5: +Got arguments 1:user@dummy 2:hg -R remote serve --stdio 3: 4: 5: Got arguments 1:user@dummy 2:hg -R local serve --stdio 3: 4: 5: Got arguments 1:user@dummy 2:hg -R remote serve --stdio 3: 4: 5: Got arguments 1:user@dummy 2:hg -R remote serve --stdio 3: 4: 5: