-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlszip.py
415 lines (324 loc) · 12.8 KB
/
lszip.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
import requests
import sys
import struct
import os
import zlib
import argparse
debug = False
if debug:
import http.client
http.client.HTTPConnection.debuglevel = 5
# Structure "End of central directory(ECD)" supports variable length
# comments at the end of file. Length of comment is specified by 2 bytes
# unsigned integer field in ECD.
ZIP_ECD_MAX_COMMENT = (1 << 8*2) - 1
# Structure of "End of central directory" specified by standard
# excluding comment as its length is not known beforehand
structECD = '<I4HIIH'
sizeECD = struct.calcsize(structECD)
ZIP_ECD_MAX_SIZE = sizeECD + ZIP_ECD_MAX_COMMENT
# First 4 bytes of ECD should match this
signECD = 0x06054b50
# Indices of entries in ECD
_ECD_SIGNATURE = 0
_ECD_DISK_NUMBER = 1
_ECD_DISK_START = 2
_ECD_ENTRIES_THIS_DISK = 3
_ECD_ENTRIES_TOTAL = 4
_ECD_SIZE = 5
_ECD_OFFSET = 6
_ECD_COMMENT_SIZE = 7
# Structure of "Central Directory(CD) Header"
# excluding variable fields: file_name, extra_field, and file_comment
structCD = '<I6H3I5H2I'
sizeCD = struct.calcsize(structCD)
# First 4 bytes of CD header should match this
signCD = 0x02014b50
# Indices of entries in CD
# indexes of entries in the central directory structure
_CD_SIGNATURE = 0
_CD_VERSION_MADE_BY = 1
_CD_VERSION_TO_EXTRACT = 2
_CD_COMPRESSION = 3
_CD_GP_BIT_FLAG = 4
_CD_TIME = 5
_CD_DATE = 6
_CD_CRC = 7
_CD_COMPRESSED_SIZE = 8
_CD_UNCOMPRESSED_SIZE = 9
_CD_FILENAME_LENGTH = 10
_CD_EXTRA_FIELD_LENGTH = 11
_CD_COMMENT_LENGTH = 12
_CD_DISK_NUMBER_START = 13
_CD_INTERNAL_FILE_ATTRIBUTES = 14
_CD_EXTERNAL_FILE_ATTRIBUTES = 15
_CD_LOCAL_HEADER_OFFSET = 16
# Structure of "Local File Header"
# excluding variable fields: file_name and extra_field
structLHeader = '<I5H3I2H'
sizeLHeader = struct.calcsize(structLHeader)
# First 4 bytes of Local File header should match this
signLHeader = 0x04034b50
# Indices of entries in Local File header
_LH_SIGNATURE = 0
_LH_VERSION_TO_EXTRACT = 1
_LH_GP_BIT_FLAG = 2
_LH_COMPRESSION = 3
_LH_TIME = 4
_LH_DATE = 5
_LH_CRC = 6
_LH_COMPRESSED_SIZE = 7
_LH_UNCOMPRESSED_SIZE = 8
_LH_FILENAME_LENGTH = 9
_LH_EXTRA_FIELD_LENGTH = 10
# Compression Methods
_COMPR_STORED = 0
_COMPR_DEFLATE = 8
def generate_range_header(lowByte=0, highByte=''):
'''
Returns dict such as {'Range': 'bytes=22-300'}
as per HTTP/1.1 description
http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
Note that both end-values are inclusive.
'''
if lowByte < 0:
# Low byte is negative. Means backward indexing
# High byte not significant
# Eg: {'Range': 'bytes=-22'} returns last 22 bytes
return {'Range': 'bytes=' + str(lowByte)}
# Eg: {'Range': 'bytes=22-200'} , {'Range': 'bytes=22-'}
range = 'bytes=%s-%s' %(lowByte, highByte)
return {'Range': range}
def zip_get_valid_ecd(bytes):
'''
Given bytes of data, this function validates it for
"End of Central Directory" entry and returns it, None otherwise
It checks for two scenarios:
* ECD with no ZIP archive comment
* ECD with ZIP archive comment
Any extra bytes invalidates the data.
'''
# Unpack it, check signature and check comment's length
ecd = struct.unpack(structECD, bytes[: sizeECD])
# Check signature
if ecd[_ECD_SIGNATURE] != int(signECD):
return None
# Signature is correct. Check comment length's validity
if (sizeECD + ecd[_ECD_COMMENT_SIZE]) != len(bytes):
return None
return ecd
def zip_get_ecd(bytes):
'''
Given bytes of data, this function searches for
"End of Central Directory" Entry and returns it, None otherwise.
'''
if len(bytes) < sizeECD:
return None
# Start at minimum index from end, where ECD can exist
startIndex = len(bytes) - sizeECD
while startIndex >= 0:
ecd = zip_get_valid_ecd(bytes[startIndex:])
if ecd:
# Found
return ecd
startIndex -= 1
return None
class CDEntry(object):
'''
Class for holding Central Directory Entry contents
as well as file data and helper functions for extracting it
'''
# ID is assigned by classmethod "get_cd_entries"
# Used for specifiying download in download mode
id = None
file_data = None
is_dir = False
def __init__(self, bytes):
'''
bytes: Raw bytes including filename, extra_field and file_comment. Extra bytes
are neglected
'''
central_dir_header = struct.unpack(structCD, bytes[:sizeCD])
if central_dir_header[_CD_SIGNATURE] != int(signCD):
raise Exception('Bad Central Directory Header')
self.filename_length = central_dir_header[_CD_FILENAME_LENGTH]
self.extra_field_length = central_dir_header[_CD_EXTRA_FIELD_LENGTH]
self.comment_length = central_dir_header[_CD_COMMENT_LENGTH]
self.local_header_offset = central_dir_header[_CD_LOCAL_HEADER_OFFSET]
self.compressed_size = central_dir_header[_CD_COMPRESSED_SIZE]
if self.compressed_size == 0:
self.is_dir = True
self.compression_method = central_dir_header[_CD_COMPRESSION]
self.filename = bytes[sizeCD:sizeCD + self.filename_length].decode('utf-8')
# Convert absolute paths to relative by stripping '/' from beginning
self.filename = self.filename.lstrip('/')
def __str__(self):
return '%s : %s' %(self.id, self.filename)
@property
def total_size(self):
'''
Returns total size of Central Dir Entry equivalent to:
cd_header + filename_length + comment_length + extra_field_length
'''
return sizeCD + self.filename_length + self.extra_field_length + self.comment_length
class ZIPRetriever(object):
'''
Download Helper class that uses a single requests session
'''
session = None
url = None
# Populated by get_ecd_bytes
archive_size = None
ecd = None
# Default Current Working Directory to jump once directory
# extraction completes
BASE_CWD = os.getcwd()
def __init__(self, url, cwd):
# Change to working directory if specified
if cwd:
self.BASE_CWD = cwd
self.session = requests.Session()
self.url = url
def get_response(self, lowByte=0, highByte=''):
'''
Low level function to get response of request from lowByte to highByte
whose descriptions are as same as in generate_range_header
'''
headers = generate_range_header(lowByte=lowByte, highByte=highByte)
response = self.session.get(self.url, headers=headers)
# Check if we are not getting partial content
if not int(response.headers['content-length']) < ZIP_ECD_MAX_SIZE:
assert response.status_code == 206
return response
def get_ecd(self):
'''
Returns ECD array by downloading ZIP_ECD_MAX_SIZE bytes
'''
# Get around 65kb of data in case the file has archive comment
request_data_size = ZIP_ECD_MAX_SIZE
response = self.get_response(lowByte=-(request_data_size))
self.ecd = zip_get_ecd(response.content)
self.ecd_request_data = response.content
return self.ecd
def get_cd_bytes(self):
'''
Returns bytes from which central directory starts
'''
ecd = self.get_ecd()
if not ecd:
raise Exception('Bad Zip File')
# Get Central Directory start offset relative to whole ZIP archive
cd_start_offset = ecd[_ECD_OFFSET]
r = self.get_response(lowByte=cd_start_offset)
return r.content
def get_cd_entries(self):
'''
Returns a list of CDEntry objects, also save it inside the object
'''
# Get bytes from which Central Directory entries start
bytes = self.get_cd_bytes()
cd_entries = []
i = 0
entry_pointer = 0
# len(bytes[entry_pointer:]) checked to ensure that we are not out of bytes
while (sizeCD < len(bytes) - sizeECD) and (len(bytes[entry_pointer:]) >= sizeCD):
cd_entry = CDEntry(bytes[entry_pointer:])
cd_entry.id = i
cd_entries.append(cd_entry)
entry_pointer += cd_entry.total_size
i += 1
self.cd_entries = cd_entries
return cd_entries
def get_local_header(self, cd_entry):
'''
Returns local header for given central dir entry
'''
local_header_offset = cd_entry.local_header_offset
r = self.get_response(local_header_offset, local_header_offset + sizeLHeader - 1)
local_header = struct.unpack(structLHeader, r.content)
return local_header
def get_file_data(self, cd_entry, local_header):
'''
Returns file data represented by given local_header and cd_entry
'''
data_start_offset = cd_entry.local_header_offset \
+ sizeLHeader \
+ local_header[_LH_FILENAME_LENGTH] \
+ local_header[_LH_EXTRA_FIELD_LENGTH]
r = self.get_response(data_start_offset, data_start_offset + local_header[_LH_COMPRESSED_SIZE] - 1)
return r.content
def _extract(self, cd_entry):
'''
Extracts the data represented by cd_entry to given filename.
Directory tree should exist if filename contains so.
'''
if cd_entry.compression_method not in (_COMPR_DEFLATE, _COMPR_STORED):
return -1
with open(cd_entry.filename, 'wb') as outfile:
if cd_entry.compression_method == _COMPR_DEFLATE:
# Negative value suppresses standard gzip header check
outfile.write(zlib.decompress(cd_entry.file_data, -15))
elif cd_entry.compression_method == _COMPR_STORED:
outfile.write(cd_entry.file_data)
print("Extracted : %s" %(os.path.join(self.BASE_CWD, cd_entry.filename)))
def _extract_dir(self, cd_entry):
# Recheck here that cd_entry is a directory
if cd_entry.is_dir:
dirname = cd_entry.filename
#print("Now at %s ..Making -p %s" %(os.getcwd(), dirname))
os.makedirs(dirname, exist_ok=True)
for ce in self.cd_entries:
# if dirname = 'windows/d/', file1 = 'windows/d/e', file2 = 'windows/d/e/f.txt'
# All these will be downloaded as they all have common prefix 'windows/d'
if dirname == os.path.commonprefix([dirname, ce.filename]):
# We don't need to extract inner directories as they will be created
# while extracting files inside them
if not ce.is_dir:
self.extract(ce)
def extract(self, cd_entry):
'''
Download file or folder represeted by cd_entry. This function
will create directory tree if needed
'''
# Change to default download dir before extracting
os.chdir(self.BASE_CWD)
#print("Now at %s ..Changing to %s" %(os.getcwd(), self.BASE_CWD))
if cd_entry.is_dir:
return self._extract_dir(cd_entry)
else:
local_header = self.get_local_header(cd_entry)
data = self.get_file_data(cd_entry, local_header)
cd_entry.file_data = data
# Some archivers set this value only in local header
cd_entry.compression_method = local_header[_LH_COMPRESSION]
# Create directory tree if it does not exist
parentdir = os.path.dirname(cd_entry.filename)
os.makedirs(parentdir, exist_ok=True)
return self._extract(cd_entry)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("url", help="ZIP File's URL")
parser.add_argument("--nolist", action="store_true", default=False, help="Disable Listing of Files")
parser.add_argument("--download", type=str,
help='List of Comma Separated IDs to download. IDs are listed in listing mode.')
parser.add_argument("--cwd", type=str,
help='''Set current working directory where downloads are done.
Defaults to current directory. This directory should exist.''')
args = parser.parse_args()
url = args.url
if args.cwd:
# Change it to absolute path so that os.chdir() can work at any level
args.cwd = os.path.abspath(args.cwd)
retriever = ZIPRetriever(url, cwd=args.cwd)
retriever.get_cd_entries()
assert retriever.ecd[_ECD_ENTRIES_TOTAL] == len(retriever.cd_entries)
if not args.nolist:
for cd_entry in retriever.cd_entries:
print(cd_entry)
if args.download:
download_ids = args.download.split(',')
for id, cd_entry in enumerate(retriever.cd_entries):
if str(id) in download_ids:
retriever.extract(cd_entry)
if __name__ == '__main__':
main()