This repository has been archived by the owner on Mar 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rlecompressor.py
137 lines (110 loc) · 4.67 KB
/
rlecompressor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python3
import sys
import os
# some people out there are still using old versions of python that does not have the following module, so we check for that
try:
import shutil
except ImportError:
sys.exit('error: python 3.3 or above is required for this to operate')
"""
RLE (run-length encoding) compression tool
Data is compressed by counting the occurrences of one byte before a different byte.
The occurrence count of that one byte is then associated with that byte and it goes on.
For instance, if there were three 0x08 bytes together, when compressed, this is represented as '0x08 0x02' (starts from 0x00).
Further bytes are appended. Index 0 would return the first byte and index 1 returns the occurrence count.
Index 2 would be the second byte, index 3 is the occurrence count for that byte. And so forth.
To decompress the compressed data, the occurrence count is used to determine how much of each byte needs to be put out.
"""
# read operation
def unpack(file):
try:
f = open(file, 'rb')
except FileNotFoundError:
sys.exit('error: failed to read input file as it does not exist') # file does not exist
except IOError:
sys.exit('error: failed to read input file. are permissions correctly set?') # generic i/o error -- likely permissions
fileContent = f.read()
f.close()
return fileContent
# free space check
def storage(file, dat):
path = os.path.dirname(os.path.abspath(file)) # get full destination file path but omit file name to keep shutil happy
free = shutil.disk_usage(path).free
remainingSpace = free - len(dat)
# verify if user has enough free storage to write the data
if remainingSpace <= 0:
sys.exit('error: output file destination has insufficient space')
return
# write operation
def pack(file, dat):
try:
combineDat = bytes(dat) # change list to bytes data type before write
except ValueError:
sys.exit('error: simultaneous byte count exceeded 256') # working with base 16 and a single byte for counting, so we cannot work with fairly large numbers
try:
f = open(file, 'wb')
except FileNotFoundError:
sys.exit('error: failed to write output file. ensure the destination directory exists.') # specified directory does not exist
except IOError:
sys.exit('error: failed to write output file. are permissions correctly set?') # generic i/o error -- likely permissions
storage(file, combineDat) # storage check
f.write(combineDat)
f.flush()
os.fsync(f)
f.close()
return combineDat
# compresses content from input file using run-length encoding lossless compression
def compress(input, output):
compressionBuffer = [] # init buffer
trackedByte = None # init currently used byte
unpackedDat = unpack(input)
for byte in unpackedDat:
if byte != trackedByte:
# create a new byte
trackedCount = 0x00 # (re)set counter
trackedByte = byte # set new byte
compressionBuffer.append(byte) # create new byte entry
compressionBuffer.append(trackedCount) # create new byte counter
elif trackedByte != None:
# update current counter
trackedCount = trackedCount + 0x01 # increment counter
compressionBuffer[-1] = trackedCount # update last byte counter in buffer
pack(output, compressionBuffer)
return compressionBuffer
# decompresses content from input file containing run-length encoding lossless compressed data
def decompress(input, output):
decompressionBuffer = [] # init buffer
byteBegin = 0
byteEnd = 1
dat = unpack(input)
# integrity check (ensure it is not empty but is divisible by two)
if len(dat) == 0 or len(dat) % 2 == 1:
sys.exit('error: bad data')
for _ in range(len(dat) - 4):
# take first byte for sort of byte and then next for amount (taking the first one into account)
for _ in range(dat[byteEnd] + 1):
decompressionBuffer.append(dat[byteBegin])
# check if no more bytes
try:
dat[byteEnd + 1]
except IndexError:
break
byteBegin = byteBegin + 2
byteEnd = byteEnd + 2
pack(output, decompressionBuffer)
return decompressionBuffer
# argument handling
try:
modeType = sys.argv[1]
fileInput = sys.argv[2]
fileOutput = sys.argv[3]
except IndexError:
fileName = sys.argv[0]
sys.exit('syntax: ' + fileName + ' [c (compress) | d (decompress)] [input file] [output file]')
# modes
if modeType == 'c':
compress(fileInput, fileOutput)
elif modeType == 'd':
decompress(fileInput, fileOutput)
else:
sys.exit('error: no such mode')