12345678910111213141516171819202122232425262728293031323334353637383940414243 |
- import dataclasses
- import pytest
- from hivemind.dht import get_dht_time
- from hivemind.dht.crypto import RSASignatureValidator
- from hivemind.dht.validation import DHTRecord
- def test_rsa_signature_validator():
- receiver_validator = RSASignatureValidator()
- sender_validator = RSASignatureValidator()
- mallory_validator = RSASignatureValidator()
- plain_record = DHTRecord(key=b'key', subkey=b'subkey', value=b'value',
- expiration_time=get_dht_time() + 10)
- protected_records = [
- dataclasses.replace(plain_record,
- key=plain_record.key + sender_validator.ownership_marker),
- dataclasses.replace(plain_record,
- subkey=plain_record.subkey + sender_validator.ownership_marker),
- ]
- # test 1: Non-protected record (no signature added)
- assert sender_validator.sign_value(plain_record) == plain_record.value
- assert receiver_validator.validate(plain_record)
- # test 2: Correct signatures
- signed_records = [dataclasses.replace(record, value=sender_validator.sign_value(record))
- for record in protected_records]
- for record in signed_records:
- assert receiver_validator.validate(record)
- assert receiver_validator.strip_value(record) == b'value'
- # test 3: Invalid signatures
- signed_records = protected_records # Without signature
- signed_records += [dataclasses.replace(record,
- value=record.value + b'[signature:INVALID_BYTES]')
- for record in protected_records] # With invalid signature
- signed_records += [dataclasses.replace(record, value=mallory_validator.sign_value(record))
- for record in protected_records] # With someone else's signature
- for record in signed_records:
- assert not receiver_validator.validate(record)
|