Kolejne zadanie to tak właściwie kontynuacja albo rozwinięcie tego co robiliśmy ostatnio z tym, że zamiast modyfikować wartości kluczy publicznych tym razem zmieniamy wartość g, czyli liczby, która podnoszona jest do potęgi w czasie wykonywania protokołu Diffiego-Hellmana.
Naszym zadaniem jest przeprowadzenie testu i sprawdzenie trzech przypadków:
1. g = 1,
2. g = p,
3. g = p-1.
W związku z tym, że modyfikujemy wartość g, która jest przesyłana ze strony A do strony B to musimy zmodyfikować również wartość publicznego klucza A, która zostanie wysłana do strony B zgodnie z nową wartością g:
1. g = 1 => A = 1,
2. g = p => A = 0,
3. g = p – 1 => A = 1 lub A = p-1.
Przypominam, że p to publicznie znana liczba pierwsza ustalana podczas protokołu ustalania klucza szyfrującego.
Pierwszy przypadek jest bardzo prosty, ponieważ jeżeli strona B dostanie informację, że g ma wartość 1 to nieważne jaka prywatna wartość b zostanie wylosowana, to 1 podniesione do dowolnej potęgi zawsze wynosi 1 a w związku z tym publiczny klucz strony B również będzie miał wartość 1. Strona A również podniesie wartość 1 do losowej potęgi i ponownie otrzyma wartość 1 jako klucz szyfrujący.
Drugi przypadek jest równie prosty jak poprzedni. Jeżeli g będzie miało wartość p to niezależnie do jakiej zostanie potęgi po wykonaniu dzielenia modulo zawsze otrzymamy w wyniku 0. Strona A podniesie wartość 0 do losowej potęgi i również otrzyma wartość 0 jako klucz szyfrujący.
Ostatni przypadek jest ciekawszy ponieważ zmieniając wartość g na (p -1) mamy możliwe dwa rozwiązania. Jeżeli g zostanie podniesione do potęgi parzystej g[0,2,4,…,2n] % p = 1, natomiast jeżeli g zostanie podniesione do potęgi nieparzystej g[1,3,5,…,2n+1] % p = p – 1. W związku z tym w momencie preparowania publicznego klucza A musimy wybrać jedną z dwóch możliwości, jeżeli wybierzemy nieprawidłowo strony A i B będą posługiwały się innymi kluczami sesji a my będziemy musieli „tłumaczyć” zaszyfrowane dane z jednego klucza na drugi (Rozszyfrowywać dane wysyłane z A do B kluczem SA i szyfrować ponownie kluczem SB i odwrotnie w przypadku komunikacji B do A). Aby zautomatyzować proces wykrywania czy wybraliśmy prawidłową wartość klucza SA wykorzystamy fakt, że wartość która jest prawidłowa będzie posiadała prawidłowe wypełnienie (padding).
Moja implementacja poniżej:
def thread_a35_function(HOST_B, PORT_B):
public_p = 0xffffffffffffffffc90fdaa22168c234c4c6628b80dc1cd129024e088a67cc74020bbea63b139b22514a08798e3404ddef9519b3cd3a431b302b0a6df25f14374fe1356d6d51c245e485b576625e7ec6f44c42e9a637ed6b0bff5cb6f406b7edee386bfb5a899fa5ae9f24117c4b1fe649286651ece45b3dc2007cb8a163bf0598da48361c55d39a69163fa8fd24cf5f83655d23dca3ad961c62f356208552bb9ed529077096966d670c354e4abc9804f1746c08ca237327ffffffffffffffff
public_g = 2
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_a:
socket_a.connect((HOST_B, PORT_B))
private_a = generate_private_key(public_p)
public_a = generate_public_key(public_g, private_a, public_p)
# Send p, g to B
session_init_data = json.dumps(
{
"p": public_p,
"g": public_g
}
)
socket_a.sendall(str.encode(session_init_data))
# Receive ACK from B
recv_session_init_data = wait_and_recv_data(socket_a, 4*1024)
if recv_session_init_data != b'ACK':
print("ACK not received: ", recv_session_init_data)
exit(-1)
# Send public A key to B
session_init_data = json.dumps(
{
"a": public_a
}
)
socket_a.sendall(str.encode(session_init_data))
# Receive public B key from B
recv_session_init_data = wait_and_recv_data(socket_a, 4*1024)
json_init_data = json.loads(recv_session_init_data)
# Generate session key
key = generate_session_key(json_init_data["b"], private_a, public_p)
key = generate_sha1(key)
print("A KEY HASH: ", key)
# Send secret message to B
socket_a.sendall(generate_encrypted_message(b'Hello darkness my old friend...', key))
# Receive secret message from B
data = wait_and_recv_data(socket_a, 4*1024)
plain_data = decrypt_received_message(data, key)
print("A RECEIVED MSG: ", plain_data)
def thread_b35_function(HOST_B, PORT_B):
print("Starting thread B at: ", HOST_B, ":", PORT_B)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_b:
socket_b.bind((HOST_B, PORT_B))
socket_b.listen()
conn, addr = socket_b.accept()
with conn:
# Receive p and g from A
print("Connected: ", addr)
data = wait_and_recv_data(conn, 4*1024)
print("Received init session data: ", data)
json_init_data = json.loads(data)
private_b = generate_private_key(json_init_data["p"])
public_b = generate_public_key(json_init_data["g"], private_b, json_init_data["p"])
# Send ACK to A
conn.sendall(b'ACK')
# Receive public A key from A
data = wait_and_recv_data(conn, 4*1024)
json_init_data2 = json.loads(data)
# Generate public key B session key
key = generate_session_key(json_init_data2["a"], private_b, json_init_data["p"])
print("B KEY: ", key)
key = generate_sha1(key)
print("B KEY HASH: ", key)
# Send public key B to A
session_init_response = json.dumps(
{
"b": public_b
}
)
conn.sendall(str.encode(session_init_response))
# Receive secret message from A
data = wait_and_recv_data(conn, 4*1024)
plain_data = decrypt_received_message(data, key)
print("B RECEIVED MSG: ", plain_data)
# Send secret message to A
conn.sendall(generate_encrypted_message(plain_data, key))
def thread_m35_function(HOST_M, PORT_M, HOST_B, PORT_B, G_VAL):
print("Starting thread M at: ", HOST_M, ":", PORT_M)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_ma:
socket_ma.bind((HOST_M, PORT_M))
socket_ma.listen()
conn, addr = socket_ma.accept()
with conn:
print("Connected: ", addr)
data = wait_and_recv_data(conn, 4*1024)
json_init_data = json.loads(data)
P_VAL = json_init_data["p"]
json_init_data_forged = json.dumps(
{
"p": json_init_data["p"],
"g": G_VAL
}
)
print("M Sending forged init data to B: ", json_init_data_forged)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_mb:
socket_mb.connect((HOST_B, PORT_B))
# Send forged init data to B
socket_mb.sendall(str.encode(json_init_data_forged))
# Receive ACK from B and forward it to A
data = wait_and_recv_data(socket_mb, 4*1024)
conn.sendall(data)
# Recive public A key from A, forge it and sent to B
data = wait_and_recv_data(conn, 4*1024)
json_init_data2 = json.loads(data)
key_a = 0
key_b = 0
if G_VAL == 1:
json_init_data2["a"] = 1
key_a = 1
key_b = 1
elif G_VAL == P_VAL:
json_init_data2["a"] = 0
key_a = 0
key_b = 0
else:
json_init_data2["a"] = 1
key_a = 1
key_b = 1
data = json.dumps(json_init_data2)
socket_mb.sendall(str.encode(data))
# Receive public B key and send it to A
data = wait_and_recv_data(socket_mb, 4*1024)
key_a = generate_sha1(key_a)
key_b = generate_sha1(key_b)
print("MA KEY HASH: ", key_a)
print("MB KEY HASH: ", key_b)
conn.sendall(data)
data = wait_and_recv_data(conn, 4*1024)
plain_data, valid = decrypt_received_message_ext(data, key_a)
if not valid:
key_a = generate_sha1(P_VAL-1)
plain_data, valid = decrypt_received_message_ext(data, key_a)
print("MA RECEIVED MSG: ", plain_data, valid)
data = generate_encrypted_message(plain_data, key_b)
socket_mb.sendall(data)
data = wait_and_recv_data(socket_mb, 4*1024)
plain_data, valid = decrypt_received_message_ext(data, key_b)
print("MB RECEIVED MSG: ", plain_data, valid)
data = generate_encrypted_message(plain_data, key_a)
conn.sendall(data)
def s5challenge35_part1():
print("Challenge 35 par1 (g = 1)")
HOST_B = '127.0.0.1'
HOST_M = '127.0.0.1'
PORT_B = 40002
PORT_M = 40001
thread_b_handle = threading.Thread(target=thread_b35_function, args=(HOST_B, PORT_B))
thread_m_handle = threading.Thread(target=thread_m35_function, args=(HOST_M, PORT_M, HOST_B, PORT_B, 1))
thread_m_handle.start()
thread_b_handle.start()
sleep(1)
thread_a35_function(HOST_M, PORT_M)
thread_m_handle.join()
thread_b_handle.join()
def s5challenge35_part2():
print("Challenge 35 par2 (g = p)")
P_VAL = 0xffffffffffffffffc90fdaa22168c234c4c6628b80dc1cd129024e088a67cc74020bbea63b139b22514a08798e3404ddef9519b3cd3a431b302b0a6df25f14374fe1356d6d51c245e485b576625e7ec6f44c42e9a637ed6b0bff5cb6f406b7edee386bfb5a899fa5ae9f24117c4b1fe649286651ece45b3dc2007cb8a163bf0598da48361c55d39a69163fa8fd24cf5f83655d23dca3ad961c62f356208552bb9ed529077096966d670c354e4abc9804f1746c08ca237327ffffffffffffffff
HOST_B = '127.0.0.1'
HOST_M = '127.0.0.1'
PORT_B = 40002
PORT_M = 40001
thread_b_handle = threading.Thread(target=thread_b35_function, args=(HOST_B, PORT_B))
thread_m_handle = threading.Thread(target=thread_m35_function, args=(HOST_M, PORT_M, HOST_B, PORT_B, P_VAL))
thread_m_handle.start()
thread_b_handle.start()
sleep(1)
thread_a35_function(HOST_M, PORT_M)
thread_m_handle.join()
thread_b_handle.join()
def s5challenge35_part3():
print("Challenge 35 par3 (g = p-1)")
print("Challenge 35 par2 (g = p)")
P_VAL = 0xffffffffffffffffc90fdaa22168c234c4c6628b80dc1cd129024e088a67cc74020bbea63b139b22514a08798e3404ddef9519b3cd3a431b302b0a6df25f14374fe1356d6d51c245e485b576625e7ec6f44c42e9a637ed6b0bff5cb6f406b7edee386bfb5a899fa5ae9f24117c4b1fe649286651ece45b3dc2007cb8a163bf0598da48361c55d39a69163fa8fd24cf5f83655d23dca3ad961c62f356208552bb9ed529077096966d670c354e4abc9804f1746c08ca237327ffffffffffffffff
HOST_B = '127.0.0.1'
HOST_M = '127.0.0.1'
PORT_B = 40002
PORT_M = 40001
thread_b_handle = threading.Thread(target=thread_b35_function, args=(HOST_B, PORT_B))
thread_m_handle = threading.Thread(target=thread_m35_function, args=(HOST_M, PORT_M, HOST_B, PORT_B, P_VAL-1))
thread_m_handle.start()
thread_b_handle.start()
sleep(1)
thread_a35_function(HOST_M, PORT_M)
thread_m_handle.join()
thread_b_handle.join()
def s5challenge35():
print("Challenge 35")
#s5challenge35_part1()
#s5challenge35_part2()
s5challenge35_part3()
Całość dostępna w repozytorium: https://gitlab.com/akoltys/cryptopals .