comparison mercurial/hg.py @ 635:85e2209d401c

Protocol switch from using generators to stream-like objects. This allows the the pull side to precisely control how much data is read so that another encapsulation layer is not needed. An http client gets a response with a finite size. Because ssh clients need to keep the stream open, we must not read more data than is sent in a response. But due to the streaming nature of the changegroup scheme, only the piece that's parsing the data knows how far it's allowed to read. This means the generator scheme isn't fine-grained enough. Instead we need file-like objects with a read(x) method. This switches everything for push/pull over to using file-like objects rather than generators.
author Matt Mackall <mpm@selenic.com>
date Wed, 06 Jul 2005 22:20:12 -0800
parents da5378d39269
children ac0ec421e3a5
comparison
equal deleted inserted replaced
634:da5378d39269 635:85e2209d401c
1023 1023
1024 cg = self.changegroup(update) 1024 cg = self.changegroup(update)
1025 return remote.addchangegroup(cg) 1025 return remote.addchangegroup(cg)
1026 1026
1027 def changegroup(self, basenodes): 1027 def changegroup(self, basenodes):
1028 nodes = self.newer(basenodes)
1029
1030 # construct the link map
1031 linkmap = {}
1032 for n in nodes:
1033 linkmap[self.changelog.rev(n)] = n
1034
1035 # construct a list of all changed files
1036 changed = {}
1037 for n in nodes:
1038 c = self.changelog.read(n)
1039 for f in c[3]:
1040 changed[f] = 1
1041 changed = changed.keys()
1042 changed.sort()
1043
1044 # the changegroup is changesets + manifests + all file revs
1045 revs = [ self.changelog.rev(n) for n in nodes ]
1046
1047 for y in self.changelog.group(linkmap): yield y
1048 for y in self.manifest.group(linkmap): yield y
1049 for f in changed:
1050 yield struct.pack(">l", len(f) + 4) + f
1051 g = self.file(f).group(linkmap)
1052 for y in g:
1053 yield y
1054
1055 def addchangegroup(self, generator):
1056
1057 class genread: 1028 class genread:
1058 def __init__(self, generator): 1029 def __init__(self, generator):
1059 self.g = generator 1030 self.g = generator
1060 self.buf = "" 1031 self.buf = ""
1061 def read(self, l): 1032 def read(self, l):
1065 except StopIteration: 1036 except StopIteration:
1066 break 1037 break
1067 d, self.buf = self.buf[:l], self.buf[l:] 1038 d, self.buf = self.buf[:l], self.buf[l:]
1068 return d 1039 return d
1069 1040
1041 def gengroup():
1042 nodes = self.newer(basenodes)
1043
1044 # construct the link map
1045 linkmap = {}
1046 for n in nodes:
1047 linkmap[self.changelog.rev(n)] = n
1048
1049 # construct a list of all changed files
1050 changed = {}
1051 for n in nodes:
1052 c = self.changelog.read(n)
1053 for f in c[3]:
1054 changed[f] = 1
1055 changed = changed.keys()
1056 changed.sort()
1057
1058 # the changegroup is changesets + manifests + all file revs
1059 revs = [ self.changelog.rev(n) for n in nodes ]
1060
1061 for y in self.changelog.group(linkmap): yield y
1062 for y in self.manifest.group(linkmap): yield y
1063 for f in changed:
1064 yield struct.pack(">l", len(f) + 4) + f
1065 g = self.file(f).group(linkmap)
1066 for y in g:
1067 yield y
1068
1069 yield struct.pack(">l", 0)
1070
1071 return genread(gengroup())
1072
1073 def addchangegroup(self, source):
1074
1070 def getchunk(): 1075 def getchunk():
1071 d = source.read(4) 1076 d = source.read(4)
1072 if not d: return "" 1077 if not d: return ""
1073 l = struct.unpack(">l", d)[0] 1078 l = struct.unpack(">l", d)[0]
1074 if l <= 4: return "" 1079 if l <= 4: return ""
1085 return self.changelog.count() 1090 return self.changelog.count()
1086 1091
1087 def revmap(x): 1092 def revmap(x):
1088 return self.changelog.rev(x) 1093 return self.changelog.rev(x)
1089 1094
1090 if not generator: return 1095 if not source: return
1091 changesets = files = revisions = 0 1096 changesets = files = revisions = 0
1092 1097
1093 source = genread(generator)
1094 tr = self.transaction() 1098 tr = self.transaction()
1095 1099
1096 # pull off the changeset group 1100 # pull off the changeset group
1097 self.ui.status("adding changesets\n") 1101 self.ui.status("adding changesets\n")
1098 co = self.changelog.tip() 1102 co = self.changelog.tip()
1590 self.ui.warn("unexpected response:\n" + d[:400] + "\n...\n") 1594 self.ui.warn("unexpected response:\n" + d[:400] + "\n...\n")
1591 raise 1595 raise
1592 1596
1593 def changegroup(self, nodes): 1597 def changegroup(self, nodes):
1594 n = " ".join(map(hex, nodes)) 1598 n = " ".join(map(hex, nodes))
1595 zd = zlib.decompressobj()
1596 f = self.do_cmd("changegroup", roots=n) 1599 f = self.do_cmd("changegroup", roots=n)
1597 bytes = 0 1600 bytes = 0
1598 while 1: 1601
1599 d = f.read(4096) 1602 class zread:
1600 bytes += len(d) 1603 def __init__(self, f):
1601 if not d: 1604 self.zd = zlib.decompressobj()
1602 yield zd.flush() 1605 self.f = f
1603 break 1606 self.buf = ""
1604 yield zd.decompress(d) 1607 def read(self, l):
1605 self.ui.note("%d bytes of data transfered\n" % bytes) 1608 while l > len(self.buf):
1609 r = f.read(4096)
1610 if r:
1611 self.buf += self.zd.decompress(r)
1612 else:
1613 self.buf += self.zd.flush()
1614 break
1615 d, self.buf = self.buf[:l], self.buf[l:]
1616 return d
1617
1618 return zread(f)
1619
1606 1620
1607 class sshrepository: 1621 class sshrepository:
1608 def __init__(self, ui, path): 1622 def __init__(self, ui, path):
1609 self.url = path 1623 self.url = path
1610 self.ui = ui 1624 self.ui = ui
1678 raise 1692 raise
1679 1693
1680 def changegroup(self, nodes): 1694 def changegroup(self, nodes):
1681 n = " ".join(map(hex, nodes)) 1695 n = " ".join(map(hex, nodes))
1682 f = self.do_cmd("changegroup", roots=n) 1696 f = self.do_cmd("changegroup", roots=n)
1683 bytes = 0 1697 return self.pipei
1684 while 1:
1685 l = struct.unpack(">l", f.read(4))[0]
1686 if l == -1: break
1687 d = f.read(l)
1688 bytes += len(d)
1689 yield d
1690 self.ui.note("%d bytes of data transfered\n" % bytes)
1691 1698
1692 def repository(ui, path=None, create=0): 1699 def repository(ui, path=None, create=0):
1693 if path: 1700 if path:
1694 if path.startswith("http://"): 1701 if path.startswith("http://"):
1695 return httprepository(ui, path) 1702 return httprepository(ui, path)