comparison mercurial/localrepo.py @ 2612:ffb895f16925

add support for streaming clone. existing clone code uses pull to get changes from remote repo. is very slow, uses lots of memory and cpu. new clone code has server write file data straight to client, client writes file data straight to disk. memory and cpu used are very low, clone is much faster over lan. new client can still clone with pull, can still clone from older servers. new server can still serve older clients.
author Vadim Gelfer <vadim.gelfer@gmail.com>
date Fri, 14 Jul 2006 11:17:22 -0700
parents 00fc88b0b256
children 479e26afa10f
comparison
equal deleted inserted replaced
2611:1b4eb1f92433 2612:ffb895f16925
6 # of the GNU General Public License, incorporated herein by reference. 6 # of the GNU General Public License, incorporated herein by reference.
7 7
8 from node import * 8 from node import *
9 from i18n import gettext as _ 9 from i18n import gettext as _
10 from demandload import * 10 from demandload import *
11 import repo
11 demandload(globals(), "appendfile changegroup") 12 demandload(globals(), "appendfile changegroup")
12 demandload(globals(), "changelog dirstate filelog manifest repo context") 13 demandload(globals(), "changelog dirstate filelog manifest context")
13 demandload(globals(), "re lock transaction tempfile stat mdiff errno ui") 14 demandload(globals(), "re lock transaction tempfile stat mdiff errno ui")
14 demandload(globals(), "os revlog util") 15 demandload(globals(), "os revlog time util")
15 16
16 class localrepository(object): 17 class localrepository(repo.repository):
17 capabilities = () 18 capabilities = ()
18 19
19 def __del__(self): 20 def __del__(self):
20 self.transhandle = None 21 self.transhandle = None
21 def __init__(self, parentui, path=None, create=0): 22 def __init__(self, parentui, path=None, create=0):
23 repo.repository.__init__(self)
22 if not path: 24 if not path:
23 p = os.getcwd() 25 p = os.getcwd()
24 while not os.path.isdir(os.path.join(p, ".hg")): 26 while not os.path.isdir(os.path.join(p, ".hg")):
25 oldp = p 27 oldp = p
26 p = os.path.dirname(p) 28 p = os.path.dirname(p)
1181 # repo (local filesystem, old ssh servers). 1183 # repo (local filesystem, old ssh servers).
1182 # 1184 #
1183 # unbundle assumes local user cannot lock remote repo (new ssh 1185 # unbundle assumes local user cannot lock remote repo (new ssh
1184 # servers, http servers). 1186 # servers, http servers).
1185 1187
1186 if 'unbundle' in remote.capabilities: 1188 if remote.capable('unbundle'):
1187 return self.push_unbundle(remote, force, revs) 1189 return self.push_unbundle(remote, force, revs)
1188 return self.push_addchangegroup(remote, force, revs) 1190 return self.push_addchangegroup(remote, force, revs)
1189 1191
1190 def prepush(self, remote, force, revs): 1192 def prepush(self, remote, force, revs):
1191 base = {} 1193 base = {}
2199 self.ui.warn(_("%d warnings encountered!\n") % warnings[0]) 2201 self.ui.warn(_("%d warnings encountered!\n") % warnings[0])
2200 if errors[0]: 2202 if errors[0]:
2201 self.ui.warn(_("%d integrity errors encountered!\n") % errors[0]) 2203 self.ui.warn(_("%d integrity errors encountered!\n") % errors[0])
2202 return 1 2204 return 1
2203 2205
2206 def stream_in(self, remote):
2207 self.ui.status(_('streaming all changes\n'))
2208 fp = remote.stream_out()
2209 total_files, total_bytes = map(int, fp.readline().split(' ', 1))
2210 self.ui.status(_('%d files to transfer, %s of data\n') %
2211 (total_files, util.bytecount(total_bytes)))
2212 start = time.time()
2213 for i in xrange(total_files):
2214 name, size = fp.readline().split('\0', 1)
2215 size = int(size)
2216 self.ui.debug('adding %s (%s)\n' % (name, util.bytecount(size)))
2217 ofp = self.opener(name, 'w')
2218 for chunk in util.filechunkiter(fp, limit=size):
2219 ofp.write(chunk)
2220 ofp.close()
2221 elapsed = time.time() - start
2222 self.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
2223 (util.bytecount(total_bytes), elapsed,
2224 util.bytecount(total_bytes / elapsed)))
2225 self.reload()
2226 return len(self.heads()) + 1
2227
2228 def clone(self, remote, heads=[], pull=False):
2229 '''clone remote repository.
2230 if possible, changes are streamed from remote server.
2231
2232 keyword arguments:
2233 heads: list of revs to clone (forces use of pull)
2234 pull: force use of pull, even if remote can stream'''
2235
2236 # now, all clients that can stream can read repo formats
2237 # supported by all servers that can stream.
2238
2239 # if revlog format changes, client will have to check version
2240 # and format flags on "stream" capability, and stream only if
2241 # compatible.
2242
2243 if not pull and not heads and remote.capable('stream'):
2244 return self.stream_in(remote)
2245 return self.pull(remote, heads)
2246
2204 # used to avoid circular references so destructors work 2247 # used to avoid circular references so destructors work
2205 def aftertrans(base): 2248 def aftertrans(base):
2206 p = base 2249 p = base
2207 def a(): 2250 def a():
2208 util.rename(os.path.join(p, "journal"), os.path.join(p, "undo")) 2251 util.rename(os.path.join(p, "journal"), os.path.join(p, "undo"))