Cryptopals zestaw 5 ćwiczenie 35

przez | 6 lipca 2021

Kolejne zadanie to tak właściwie kontynuacja albo rozwinięcie tego co robiliśmy ostatnio z tym, że zamiast modyfikować wartości kluczy publicznych tym razem zmieniamy wartość g, czyli liczby, która podnoszona jest do potęgi w czasie wykonywania protokołu Diffiego-Hellmana.

Naszym zadaniem jest przeprowadzenie testu i sprawdzenie trzech przypadków:
1. g = 1,
2. g = p,
3. g = p-1.
W związku z tym, że modyfikujemy wartość g, która jest przesyłana ze strony A do strony B to musimy zmodyfikować również wartość publicznego klucza A, która zostanie wysłana do strony B zgodnie z nową wartością g:
1. g = 1 => A = 1,
2. g = p => A = 0,
3. g = p – 1 => A = 1 lub A = p-1.
Przypominam, że p to publicznie znana liczba pierwsza ustalana podczas protokołu ustalania klucza szyfrującego.

Pierwszy przypadek jest bardzo prosty, ponieważ jeżeli strona B dostanie informację, że g ma wartość 1 to nieważne jaka prywatna wartość b zostanie wylosowana, to 1 podniesione do dowolnej potęgi zawsze wynosi 1 a w związku z tym publiczny klucz strony B również będzie miał wartość 1. Strona A również podniesie wartość 1 do losowej potęgi i ponownie otrzyma wartość 1 jako klucz szyfrujący.

Drugi przypadek jest równie prosty jak poprzedni. Jeżeli g będzie miało wartość p to niezależnie do jakiej zostanie potęgi po wykonaniu dzielenia modulo zawsze otrzymamy w wyniku 0. Strona A podniesie wartość 0 do losowej potęgi i również otrzyma wartość 0 jako klucz szyfrujący.

Ostatni przypadek jest ciekawszy ponieważ zmieniając wartość g na (p -1) mamy możliwe dwa rozwiązania. Jeżeli g zostanie podniesione do potęgi parzystej g[0,2,4,…,2n] % p = 1, natomiast jeżeli g zostanie podniesione do potęgi nieparzystej g[1,3,5,…,2n+1] % p = p – 1. W związku z tym w momencie preparowania publicznego klucza A musimy wybrać jedną z dwóch możliwości, jeżeli wybierzemy nieprawidłowo strony A i B będą posługiwały się innymi kluczami sesji a my będziemy musieli „tłumaczyć” zaszyfrowane dane z jednego klucza na drugi (Rozszyfrowywać dane wysyłane z A do B kluczem SA i szyfrować ponownie kluczem SB i odwrotnie w przypadku komunikacji B do A). Aby zautomatyzować proces wykrywania czy wybraliśmy prawidłową wartość klucza SA wykorzystamy fakt, że wartość która jest prawidłowa będzie posiadała prawidłowe wypełnienie (padding).

Moja implementacja poniżej:

def thread_a35_function(HOST_B, PORT_B):
    public_p = 0xffffffffffffffffc90fdaa22168c234c4c6628b80dc1cd129024e088a67cc74020bbea63b139b22514a08798e3404ddef9519b3cd3a431b302b0a6df25f14374fe1356d6d51c245e485b576625e7ec6f44c42e9a637ed6b0bff5cb6f406b7edee386bfb5a899fa5ae9f24117c4b1fe649286651ece45b3dc2007cb8a163bf0598da48361c55d39a69163fa8fd24cf5f83655d23dca3ad961c62f356208552bb9ed529077096966d670c354e4abc9804f1746c08ca237327ffffffffffffffff
    public_g = 2
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_a:
        socket_a.connect((HOST_B, PORT_B))
        private_a = generate_private_key(public_p)
        public_a = generate_public_key(public_g, private_a, public_p)
        # Send p, g to B
        session_init_data = json.dumps(
            {
                "p": public_p,
                "g": public_g
            }
        )
        socket_a.sendall(str.encode(session_init_data))
        
        # Receive ACK from B
        recv_session_init_data = wait_and_recv_data(socket_a, 4*1024)
        if recv_session_init_data != b'ACK':
            print("ACK not received: ", recv_session_init_data)
            exit(-1)

        # Send public A key to B
        session_init_data = json.dumps(
            {
                "a": public_a
            }
        )
        socket_a.sendall(str.encode(session_init_data))

        # Receive public B key from B
        recv_session_init_data = wait_and_recv_data(socket_a, 4*1024)
        json_init_data = json.loads(recv_session_init_data)

        # Generate session key
        key = generate_session_key(json_init_data["b"], private_a, public_p)
        key = generate_sha1(key)
        print("A KEY HASH: ", key)

        # Send secret message to B
        socket_a.sendall(generate_encrypted_message(b'Hello darkness my old friend...', key))
        
        # Receive secret message from B
        data = wait_and_recv_data(socket_a, 4*1024)
        plain_data = decrypt_received_message(data, key)
        print("A RECEIVED MSG: ", plain_data)

def thread_b35_function(HOST_B, PORT_B):
    print("Starting thread B at: ", HOST_B, ":", PORT_B)
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_b:
        socket_b.bind((HOST_B, PORT_B))
        socket_b.listen()
        conn, addr = socket_b.accept()
        with conn:
            # Receive p and g from A
            print("Connected: ", addr)
            data = wait_and_recv_data(conn, 4*1024)
            print("Received init session data: ", data)
            json_init_data = json.loads(data)
            private_b = generate_private_key(json_init_data["p"])
            public_b = generate_public_key(json_init_data["g"], private_b, json_init_data["p"])

            # Send ACK to A
            conn.sendall(b'ACK')

            # Receive public A key from A
            data = wait_and_recv_data(conn, 4*1024)
            json_init_data2 = json.loads(data)

            # Generate public key B session key
            key = generate_session_key(json_init_data2["a"], private_b, json_init_data["p"])
            print("B KEY: ", key)
            key = generate_sha1(key)
            print("B KEY HASH: ", key)

            # Send public key B to A
            session_init_response = json.dumps(
                {
                    "b": public_b
                }
            )
            conn.sendall(str.encode(session_init_response))

            # Receive secret message from A
            data = wait_and_recv_data(conn, 4*1024)
            plain_data = decrypt_received_message(data, key)
            print("B RECEIVED MSG: ", plain_data)

            # Send secret message to A
            conn.sendall(generate_encrypted_message(plain_data, key))

def thread_m35_function(HOST_M, PORT_M, HOST_B, PORT_B, G_VAL):
    print("Starting thread M at: ", HOST_M, ":", PORT_M)
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_ma:
        socket_ma.bind((HOST_M, PORT_M))
        socket_ma.listen()
        conn, addr = socket_ma.accept()
        with conn:
            print("Connected: ", addr)
            data = wait_and_recv_data(conn, 4*1024)
            json_init_data = json.loads(data)
            P_VAL = json_init_data["p"]
            json_init_data_forged = json.dumps(
                {
                    "p": json_init_data["p"],
                    "g": G_VAL
                }
            )
            print("M Sending forged init data to B: ", json_init_data_forged)
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as socket_mb:
                socket_mb.connect((HOST_B, PORT_B))
                
                # Send forged init data to B 
                socket_mb.sendall(str.encode(json_init_data_forged))
                
                # Receive ACK from B and forward it to A
                data = wait_and_recv_data(socket_mb, 4*1024)
                conn.sendall(data)

                # Recive public A key from A, forge it and sent to B
                data = wait_and_recv_data(conn, 4*1024)
                json_init_data2 = json.loads(data)
                key_a = 0
                key_b = 0
                if G_VAL == 1:
                    json_init_data2["a"] = 1
                    key_a = 1
                    key_b = 1
                elif G_VAL == P_VAL:
                    json_init_data2["a"] = 0
                    key_a = 0
                    key_b = 0
                else:
                    json_init_data2["a"] = 1
                    key_a = 1
                    key_b = 1
                data = json.dumps(json_init_data2)
                socket_mb.sendall(str.encode(data))

                # Receive public B key and send it to A
                data = wait_and_recv_data(socket_mb, 4*1024)
                key_a = generate_sha1(key_a)
                key_b = generate_sha1(key_b)
                print("MA KEY HASH: ", key_a)
                print("MB KEY HASH: ", key_b)
                conn.sendall(data)

                data = wait_and_recv_data(conn, 4*1024)
                plain_data, valid = decrypt_received_message_ext(data, key_a)
                if not valid:
                    key_a = generate_sha1(P_VAL-1)
                    plain_data, valid = decrypt_received_message_ext(data, key_a)
                print("MA RECEIVED MSG: ", plain_data, valid)
                data = generate_encrypted_message(plain_data, key_b)
                socket_mb.sendall(data)
                data = wait_and_recv_data(socket_mb, 4*1024)
                plain_data, valid = decrypt_received_message_ext(data, key_b)
                print("MB RECEIVED MSG: ", plain_data, valid)
                data = generate_encrypted_message(plain_data, key_a)
                conn.sendall(data)

def s5challenge35_part1():
    print("Challenge 35 par1 (g = 1)")
    HOST_B  = '127.0.0.1'
    HOST_M  = '127.0.0.1'
    PORT_B = 40002
    PORT_M = 40001
    thread_b_handle = threading.Thread(target=thread_b35_function, args=(HOST_B, PORT_B))
    thread_m_handle = threading.Thread(target=thread_m35_function, args=(HOST_M, PORT_M, HOST_B, PORT_B, 1))
    thread_m_handle.start()
    thread_b_handle.start()
    sleep(1)
    thread_a35_function(HOST_M, PORT_M)
    thread_m_handle.join()
    thread_b_handle.join()

def s5challenge35_part2():
    print("Challenge 35 par2 (g = p)")
    P_VAL = 0xffffffffffffffffc90fdaa22168c234c4c6628b80dc1cd129024e088a67cc74020bbea63b139b22514a08798e3404ddef9519b3cd3a431b302b0a6df25f14374fe1356d6d51c245e485b576625e7ec6f44c42e9a637ed6b0bff5cb6f406b7edee386bfb5a899fa5ae9f24117c4b1fe649286651ece45b3dc2007cb8a163bf0598da48361c55d39a69163fa8fd24cf5f83655d23dca3ad961c62f356208552bb9ed529077096966d670c354e4abc9804f1746c08ca237327ffffffffffffffff
    HOST_B  = '127.0.0.1'
    HOST_M  = '127.0.0.1'
    PORT_B = 40002
    PORT_M = 40001
    thread_b_handle = threading.Thread(target=thread_b35_function, args=(HOST_B, PORT_B))
    thread_m_handle = threading.Thread(target=thread_m35_function, args=(HOST_M, PORT_M, HOST_B, PORT_B, P_VAL))
    thread_m_handle.start()
    thread_b_handle.start()
    sleep(1)
    thread_a35_function(HOST_M, PORT_M)
    thread_m_handle.join()
    thread_b_handle.join()


def s5challenge35_part3():
    print("Challenge 35 par3 (g = p-1)")
    print("Challenge 35 par2 (g = p)")
    P_VAL = 0xffffffffffffffffc90fdaa22168c234c4c6628b80dc1cd129024e088a67cc74020bbea63b139b22514a08798e3404ddef9519b3cd3a431b302b0a6df25f14374fe1356d6d51c245e485b576625e7ec6f44c42e9a637ed6b0bff5cb6f406b7edee386bfb5a899fa5ae9f24117c4b1fe649286651ece45b3dc2007cb8a163bf0598da48361c55d39a69163fa8fd24cf5f83655d23dca3ad961c62f356208552bb9ed529077096966d670c354e4abc9804f1746c08ca237327ffffffffffffffff
    HOST_B  = '127.0.0.1'
    HOST_M  = '127.0.0.1'
    PORT_B = 40002
    PORT_M = 40001
    thread_b_handle = threading.Thread(target=thread_b35_function, args=(HOST_B, PORT_B))
    thread_m_handle = threading.Thread(target=thread_m35_function, args=(HOST_M, PORT_M, HOST_B, PORT_B, P_VAL-1))
    thread_m_handle.start()
    thread_b_handle.start()
    sleep(1)
    thread_a35_function(HOST_M, PORT_M)
    thread_m_handle.join()
    thread_b_handle.join()

def s5challenge35():
    print("Challenge 35")
    #s5challenge35_part1()
    #s5challenge35_part2()
    s5challenge35_part3()

Całość dostępna w repozytorium: https://gitlab.com/akoltys/cryptopals .

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *