Cryptopals zestaw 4 ćwiczenie 31 i 32

przez | 5 maja 2021

Tym razem opiszę od razu rozwiązanie dwóch ćwiczeń w jednym wpisie, ponieważ to tak na prawdę jedno ćwiczenie, które różni się jedynie poziomem trudności. W związku z tym stwierdziłem, że od razu rozwiążę trudniejszą wersję, ponieważ metoda jest taka sama, po prostu wymaga kilku dodatkowych kroków, aby wyeliminować „szum” spowodowany losowymi opóźnieniami odpowiedzi serwera.

Naszym zadaniem będzie odgadnięcie prawidłowej wartości kodu HMAC dla dowolnej wiadomości. To co będziemy mieli do dyspozycji to serwer, do którego możemy wysłać dowolną wiadomość oraz wartość kodu, która naszym zdaniem jest prawidłowa. Jako informację zwrotną otrzymamy komunikat, że kod jest prawidłowy albo błąd, jeżeli kod jest nieprawidłowy. To co będzie charakterystyczne w zachowaniu serwera to fakt, że czas odpowiedzi jest proporcjonalny do ilości pierwszych n poprawnych bajtów HMAC i każdy kolejny prawidłowy bajt będzie wydłużał ten czas o 5ms (50ms w przypadku łatwiejszej wersji). Przykładowo, jeżeli już pierwszy bajt podanego przez nas kodu HMAC będzie niepoprawny to czas odpowiedzi będzie składał się tylko z opóźnienia wywołanego obsługą zapytania wysłanego do serwera – w dalszej części będę to nazywał „standardowe opóźnienie„. Jeżeli dopiero drugi bajt będzie niepoprawny to czas odpowiedzi będzie wynosił: standardowe opóźnienie + 1 * 5ms. W przypadku kiedy pierwsze cztery bajty są poprawne czas odpowiedzi będzie wynosił: standardowe opóźnienie + 4 * 5ms. Mówiąc krótko: na podstawie obserwacji czasu odpowiedzi serwera jesteśmy w stanie określić czy bajt ma poprawną wartość czy nie.

Wiadomo już co można zrobić i jak a teraz musimy przygotować sobie serwer, który będzie zachowywał się w sposób opisany powyżej, ale zanim do tego przejdziemy potrzebujemy implementacji HMAC, tak jak sugerują autorzy pseudo kod dostępny jest np. na Wikipedii: https://en.wikipedia.org/wiki/HMAC . Moja implementacja poniżej:

def custom_hmac(key, msg, hash_fun, block_size, hash_size):
    if len(key) > block_size:
        key = hash_fun(key)
    
    if len(key) < block_size:
        key = key + bytearray([0]*(block_size-len(key)))
    
    o_key_pad = bytes_xor(key, bytearray([0x5c]*block_size))
    i_key_pad = bytes_xor(key, bytearray([0x36]*block_size))

    return hash_fun(o_key_pad + hash_fun(i_key_pad + msg))

Implementacja serwera:

class RequestHandlerHMAC50ms(RequestHandler):
    def quick_response(self, code, msg):
        self.send_response(code)
        data = b'<head></head><body>'+msg+b'</body>'
        self.wfile.write(data)


    def do_GET_test(self, query_args):
        if ('signature' not in query_args or 'file' not in query_args):
            self.quick_response(300, b'<p>Invalid request parameters</p>')
            return

        if ('hash_key' not in self.M_CONFIG or 'hash_fun' not in self.M_CONFIG or
            'hash_len' not in self.M_CONFIG or 'hash_blocklen' not in self.M_CONFIG):
            self.quick_response(500, b'<p>Invalid server configuration</p>')
            return
        hash_key = self.M_CONFIG['hash_key']
        hash_fun = self.M_CONFIG['hash_fun']
        hash_len = self.M_CONFIG['hash_len']
        hash_blocklen = self.M_CONFIG['hash_blocklen']
        user_hash = b''
        try:
            user_hash = bytes.fromhex(query_args['signature'])
        except:
            user_hash = b''
        file_data = query_args['file'].encode('utf-8')

        if len(user_hash) != hash_len:
            self.quick_response(500, b'<p>Invalid signature</p>')
            return

        valid_hash = custom_hmac(hash_key, file_data, hash_fun, hash_blocklen, hash_len)
        
        for idx in range(hash_len):
            if user_hash[idx] != valid_hash[idx]:
                self.quick_response(500, b'<p>Invalid signature</p>')
                return
            sleep(0.005)
        self.quick_response(200, b'<p>Signature is valid</p>')


    def do_GET(self):
        path = urllib.parse.urlparse(self.path).path
        if path == '/test':
            query = urllib.parse.urlparse(self.path).query
            query_args = dict(qc.split("=") for qc in query.split("&"))
            self.do_GET_test(query_args)
        else:
            self.send_response(200)
            self.wfile.write(b'<head></head>')
            self.wfile.write(b'<body>')
            self.wfile.write(b'<p>Unkonwn command</p>')
            self.wfile.write(b'</body>')

class HttpSrvCustom():
    M_HTTPD = None
    M_SERVER_ADDRESS = None
    M_SERVING = False
    M_DEAMON = None

    def __init__(self, handler_class = RequestHandler, config = {}):
        self.M_SERVER_ADDRESS = (HOST_ADDRESS, HOST_PORT)
        handler = partial(handler_class, config)
        self.M_HTTPD = HTTPServer(self.M_SERVER_ADDRESS, handler)
        self.M_DEAMON = Thread(name='Custom HTTP Deamon', target=self.runDaemon)
        self.M_DEAMON.setDaemon(True)

    def run(self):
        self.M_DEAMON.start()

    def join(self):
        self.M_DEAMON.join()

    def runDaemon(self):
        print("Starting server at: ", self.M_HTTPD.server_address)
        self.M_HTTPD.serve_forever()
        self.M_SERVING = True

Przykładowy sposób użycia serwera:

    srv_config = {}
    srv_config['hash_key'] = generateRandomDataSize(1, 40)
    srv_config['hash_fun'] = custom_sha1
    srv_config['hash_len'] = 20
    srv_config['hash_blocklen'] = 64
    http_srv = HttpSrvCustom(RequestHandlerHMAC50ms, srv_config)
    http_srv.run()

Mamy już serwer, który działa zgodnie z założeniami. Teraz czas odgadnąć prawidłową wartość kodu HMAC. Głowna trudność to fakt, że opóźnienia w odpowiedzi serwera zależą nie tylko od ilości poprawnych bajtów w proponowanym przez nas kodzie HMAC, ale mogą też wynikać z innych powodów np. kiedy zazwyczaj odpowiedź serwera wynosi 120ms to raz na jakiś czas może ona wzrosnąć do 1300ms ponieważ serwer jest bardziej obciążony. Nasz algorytm musi filtrować takie przypadki, dodatkowo czas odpowiedzi powinien być uśredniany, żeby wyeliminować szum spowodowany różnicami w czasie odpowiedzi serwera na nasze zapytania. Ja na potrzeby rozwiązania wysyłam do serwera 10 takich samych zapytań dla każdej sprawdzanej wartości i odrzucam te wyniki, które w znaczący sposób odbiegają od spodziewanego czasu odpowiedzi. Moja implementacja rozwiązania poniżej:

def s4challenge31():
    print("Challenge 31")
    srv_config = {}
    srv_config['hash_key'] = generateRandomDataSize(1, 40)
    srv_config['hash_fun'] = custom_sha1
    srv_config['hash_len'] = 20
    srv_config['hash_blocklen'] = 64
    http_srv = HttpSrvCustom(RequestHandlerHMAC50ms, srv_config)
    http_srv.run()
    tmp_hash = bytearray([0]*20)
    part_url = 'http://192.168.0.32:8000/test?file=foo&signature='
    valid_data = False
    for i in range(20):
        if valid_data == True:
            break
        valid_byte = False
        while not valid_byte:
            if valid_data == True:
                break
            for j in range(256):
                current_timediff = 0.0
                valid_cntr = 0
                for cnt in range(10):
                    time.sleep(0.005)
                    tmp_hash[i] = j
                    full_url = part_url + tmp_hash.hex()
                    start_time = time.perf_counter()
                    try:
                        data = urllib.request.urlopen(full_url)
                        valid_data = True
                        break
                    except:
                        data = "Invalid"
                    stop_time = time.perf_counter()
                    tmp_diff = (stop_time-start_time)
                    print("\n\n\n\nSingle timediff: ", tmp_diff, " Should be:", (i+1)*0.005)
                    if (tmp_diff < (i+1)*0.0055+0.002):
                        current_timediff += tmp_diff
                        valid_cntr += 1
                
                if valid_data == True:
                    break
                if current_timediff-i*0.0055*valid_cntr > 0.00495*valid_cntr:
                    valid_byte = True
                    break
    full_url = part_url + tmp_hash.hex()
    data = urllib.request.urlopen(full_url)
    print("Data: ", data)
    print("Done")

W związku z tym, że serwer jest uruchomiony na tym samym komputerze z którego są wysyłane do niego zapytania, to standardowe opóźnienie jest bardzo małe, ja w swojej implementacji ustawiłem ją na wartość wyznaczoną eksperymentalnie można by było to zrobić lepiej i wyznaczać ją automatycznie na podstawie kilkunastu zapytań. Samo wyznaczanie poprawnej wartości jest czasochłonne ponieważ w skrajnym wypadku potrzebujemy sprawdzić 255 możliwości dla każdego bajtu w kluczu, a w przypadku mojej implementacji każda z tych wartości jest 10 razy wysyłana do serwera. Po uruchomieniu skryptu można iść na herbatę, albo np. poćwiczyć tureckie wstawanie.

Całość dostępna pod adresem https://gitlab.com/akoltys/cryptopals.

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *