Cryptopals zestaw 5 ćwiczenie 34

przez | 18 czerwca 2021

W poprzednim ćwiczeniu poznaliśmy zasadę działania algorytmu Diffiego-Hellmanaa teraz musimy odtworzyć scenariusz ataku typu „man-in-the-middle” ze wstrzykiwaniem parametru. Brzmi dość skomplikowanie, ale zasada działania jest bardzo prosta.

Atak typu man-in-the-middle (w dalszej części opisu będę posługiwał się skrótem MITM) polega na tym, że strony nie komunikują się ze sobą bezpośrednio i nie są tego świadome. Jeżeli komuś uda się sprawić, żeby cała komunikacja odbywała się przez „pośrednika”, którego da się kontrolować to można np. podsłuchiwać przesyłane informacje, pomimo tego, że są zaszyfrowane.

Opisany w zadaniu atak, polega na przekazaniu obydwu stronom kluczy publicznych o wartości równej p (czyli bardzo dużej liczbie pierwszej, która to wartość jest informacją publiczną). Załóżmy, że strony które się ze sobą komunikują to StronaA i StronaB. Po takiej zakłóconej wymianie dysponują one następującym zestawem informacji:
StronaA:
p – liczba pierwsza – informacja publiczna,
g – podstawa – informacja publiczna,
a – losowa wartość mniejsza od p – informacja prywatna,
A – ga – informacja publiczna,
B’ – spreparowana wartość klucza publicznego równa p – informacja publiczna.
StronaB:
p – liczba pierwsza – informacja publiczna,
g – podstawa – informacja publiczna,
b – losowa wartość mniejsza od p – informacja prywatna,
B – gb – informacja publiczna,
A’ – spreparowana wartość klucza publicznego równa p – informacja publiczna.

Kiedy strona A oblicza wartość klucza sesji otrzyma:
Sa = B’a % p.

W przypadku strony B:
Sb = A’b % p.

Wartości Sa oraz Sb są równe i wynoszą 0. Jest tak, ponieważ wartość p podniesiona do dowolnej potęgi po wykonaniu dzielenia modulo p da w wyniku 0. Jeżeli ktoś kto pośredniczy w komunikacji pomiędzy stronami A i B podmienia wartości kluczy publicznych tak, żeby obydwie strony myślały, że wynoszą one p dla drugiej strony można od razy odgadnąć jaka będzie wartość klucza sesji.

Moja implementacja jest rozbita na dwie części, w pierwszej części przeprowadzona jest niezakłócona komunikacja a w części drugiej, pośrednik nadpisuje klucze publiczne i dzięki temu jest w stanie podsłuchiwać wymieniane wiadomości.

def thread_b_function(HOST_B, PORT_B):
    print("Starting thread B at: ", HOST_B, ":", PORT_B)
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_b:
        socket_b.bind((HOST_B, PORT_B))
        socket_b.listen()
        conn, addr = socket_b.accept()
        with conn:
            print("Connected: ", addr)
            data = wait_and_recv_data(conn, 4*1024)
            print("Received init session data: ", data)
            json_init_data = json.loads(data)
            private_b = generate_private_key(json_init_data["p"])
            public_b = generate_public_key(json_init_data["g"], private_b, json_init_data["p"])
            key = generate_session_key(json_init_data["a"], private_b, json_init_data["p"])
            session_init_response = json.dumps(
                {
                    "b": public_b
                }
            )
            print("B KEY: ", key)
            key = generate_sha1(key)
            print("B KEY HASH: ", key)
            conn.sendall(str.encode(session_init_response))
            data = wait_and_recv_data(conn, 4*1024)
            plain_data = decrypt_received_message(data, key)
            print("B RECEIVED MSG: ", plain_data)
            conn.sendall(generate_encrypted_message(plain_data, key))

def thread_m_function(HOST_M, PORT_M, HOST_B, PORT_B):
    print("Starting thread M at: ", HOST_M, ":", PORT_M)
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_ma:
        socket_ma.bind((HOST_M, PORT_M))
        socket_ma.listen()
        conn, addr = socket_ma.accept()
        with conn:
            print("Connected: ", addr)
            data = wait_and_recv_data(conn, 4*1024)
            json_init_data = json.loads(data)
            json_init_data_forged = json.dumps(
                {
                    "p": json_init_data["p"],
                    "g": json_init_data["g"],
                    "a": json_init_data["p"]
                }
            )
            print("M Sending forged init data to B: ", json_init_data_forged)
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_mb:
                socket_mb.connect((HOST_B, PORT_B))
                socket_mb.sendall(str.encode(json_init_data_forged))
                data = wait_and_recv_data(socket_mb, 4*1024)
                json_init_resp_data = json.loads(data)
                data_forged = json.dumps(
                    {
                        "b": json_init_data["p"]
                    }
                )
                key_a = 0
                key_b = 0
                key_a = generate_sha1(key_a)
                key_b = generate_sha1(key_b)
                print("MA KEY HASH: ", key_a)
                print("MB KEY HASH: ", key_b)
                print("Sending forged init data resp to A")
                conn.sendall(str.encode(data_forged))
                data = wait_and_recv_data(conn, 4*1024)
                plain_data = decrypt_received_message(data, key_a)
                print("MA RECEIVED MSG: ", plain_data)
                data = generate_encrypted_message(plain_data, key_b)
                socket_mb.sendall(data)
                data = wait_and_recv_data(socket_mb, 4*1024)
                plain_data = decrypt_received_message(data, key_b)
                print("MB RECEIVED MSG: ", plain_data)
                data = generate_encrypted_message(plain_data, key_a)
                conn.sendall(data)


def s5challenge34_part1():
    HOST_B  = '127.0.0.1'
    PORT_B = 40002
    public_p = 0xffffffffffffffffc90fdaa22168c234c4c6628b80dc1cd129024e088a67cc74020bbea63b139b22514a08798e3404ddef9519b3cd3a431b302b0a6df25f14374fe1356d6d51c245e485b576625e7ec6f44c42e9a637ed6b0bff5cb6f406b7edee386bfb5a899fa5ae9f24117c4b1fe649286651ece45b3dc2007cb8a163bf0598da48361c55d39a69163fa8fd24cf5f83655d23dca3ad961c62f356208552bb9ed529077096966d670c354e4abc9804f1746c08ca237327ffffffffffffffff
    public_g = 2
    thread_b_handle = threading.Thread(target=thread_b_function, args=(HOST_B, PORT_B))
    thread_b_handle.start()
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_a:
        socket_a.connect((HOST_B, PORT_B))
        private_a = generate_private_key(public_p)
        public_a = generate_public_key(public_g, private_a, public_p)
        session_init_data = json.dumps(
            {
                "p": public_p,
                "g": public_g,
                "a": public_a
            }
        )
        socket_a.sendall(str.encode(session_init_data))
        recv_session_init_data = wait_and_recv_data(socket_a, 4*1024)#socket_a.recv(4*1024)
        json_init_data = json.loads(recv_session_init_data)
        key = generate_session_key(json_init_data["b"], private_a, public_p)
        print("A KEY: ", key)
        key = generate_sha1(key)
        print("A KEY HASH: ", key)
        socket_a.sendall(generate_encrypted_message(b'Hello darkness my old friend...', key))
        data = wait_and_recv_data(socket_a, 4*1024)#socket_a.recv(4*1024)
        plain_data = decrypt_received_message(data, key)
        print("A RECEIVED MSG: ", plain_data)
    thread_b_handle.join()

def s5challenge34_part2():
    HOST_B  = '127.0.0.1'
    HOST_M  = '127.0.0.1'
    PORT_B = 40002
    PORT_M = 40001
    public_p = 0xffffffffffffffffc90fdaa22168c234c4c6628b80dc1cd129024e088a67cc74020bbea63b139b22514a08798e3404ddef9519b3cd3a431b302b0a6df25f14374fe1356d6d51c245e485b576625e7ec6f44c42e9a637ed6b0bff5cb6f406b7edee386bfb5a899fa5ae9f24117c4b1fe649286651ece45b3dc2007cb8a163bf0598da48361c55d39a69163fa8fd24cf5f83655d23dca3ad961c62f356208552bb9ed529077096966d670c354e4abc9804f1746c08ca237327ffffffffffffffff
    public_g = 2
    thread_b_handle = threading.Thread(target=thread_b_function, args=(HOST_B, PORT_B))
    thread_m_handle = threading.Thread(target=thread_m_function, args=(HOST_M, PORT_M, HOST_B, PORT_B))
    thread_m_handle.start()
    thread_b_handle.start()
    sleep(1)
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_a:
        socket_a.connect((HOST_M, PORT_M))
        private_a = generate_private_key(public_p)
        public_a = generate_public_key(public_g, private_a, public_p)
        session_init_data = json.dumps(
            {
                "p": public_p,
                "g": public_g,
                "a": public_a
            }
        )
        socket_a.sendall(str.encode(session_init_data))
        recv_session_init_data = b''
        while len(recv_session_init_data) == 0:
            recv_session_init_data = socket_a.recv(4*1024)
        json_init_data = json.loads(recv_session_init_data)
        key = generate_session_key(json_init_data["b"], private_a, public_p)
        key = generate_sha1(key)
        print("A KEY HASH: ", key)
        socket_a.sendall(generate_encrypted_message(b'Hello darkness my old friend...', key))
        data = wait_and_recv_data(socket_a, 4*1024)#socket_a.recv(4*1024)
        plain_data = decrypt_received_message(data, key)
        print("A RECEIVED MSG: ", plain_data)
    thread_m_handle.join()
    thread_b_handle.join()

def s5challenge34():
    print("Challenge 34")
    #s5challenge34_part1()
    s5challenge34_part2()

Całość dostępna w repozytorium: https://gitlab.com/akoltys/cryptopals .

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *