172 lines
5.5 KiB
Python
172 lines
5.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Sistema de autenticación HMAC-SHA256 para API de Adif
|
|
Basado en ElcanoAuth.java
|
|
"""
|
|
|
|
import hmac
|
|
import hashlib
|
|
import json
|
|
from datetime import datetime
|
|
from typing import Dict
|
|
|
|
class AdifAuthenticator:
|
|
"""Autenticador para la API de Adif usando HMAC-SHA256"""
|
|
|
|
def __init__(self, access_key: str, secret_key: str, user_id: str):
|
|
self.access_key = access_key
|
|
self.secret_key = secret_key
|
|
self.user_id = user_id
|
|
self.client = "AndroidElcanoApp"
|
|
|
|
def _format_payload(self, payload: str) -> str:
|
|
"""Formatear payload (eliminar espacios, saltos de línea)"""
|
|
return payload.replace(" ", "").replace("\n", "").replace("\r", "")
|
|
|
|
def _to_hex(self, data: str) -> str:
|
|
"""Calcular SHA256 hash en hexadecimal"""
|
|
return hashlib.sha256(data.encode('utf-8')).hexdigest()
|
|
|
|
def _hmac_sha256(self, key: bytes, message: str) -> bytes:
|
|
"""Calcular HMAC-SHA256"""
|
|
return hmac.new(key, message.encode('utf-8'), hashlib.sha256).digest()
|
|
|
|
def _get_signature_key(self, date_simple: str) -> bytes:
|
|
"""Derivar clave de firma"""
|
|
# kDate = HMAC-SHA256(secret_key, date)
|
|
k_date = self._hmac_sha256(self.secret_key.encode('utf-8'), date_simple)
|
|
|
|
# kClient = HMAC-SHA256(kDate, client)
|
|
k_client = self._hmac_sha256(k_date, self.client)
|
|
|
|
# kSigning = HMAC-SHA256(kClient, "elcano_request")
|
|
k_signing = self._hmac_sha256(k_client, "elcano_request")
|
|
|
|
return k_signing
|
|
|
|
def _prepare_canonical_request(self, method: str, path: str, params: str,
|
|
host: str, date: str, payload: str) -> tuple:
|
|
"""Preparar canonical request"""
|
|
# Headers canónicos (deben estar en orden)
|
|
canonical_headers = (
|
|
f"content-type:application/json;charset=utf-8\n"
|
|
f"x-elcano-client:{self.client}\n"
|
|
f"x-elcano-date:{date}\n"
|
|
f"x-elcano-host:{host}\n"
|
|
f"x-elcano-userid:{self.user_id}\n"
|
|
)
|
|
|
|
signed_headers = "content-type;x-elcano-client;x-elcano-date;x-elcano-host;x-elcano-userid"
|
|
|
|
# Formatear payload
|
|
formatted_payload = self._format_payload(payload)
|
|
payload_hash = self._to_hex(formatted_payload)
|
|
|
|
# Canonical request
|
|
canonical_request = (
|
|
f"{method}\n"
|
|
f"{path}\n"
|
|
f"{params}\n"
|
|
f"{canonical_headers}"
|
|
f"{signed_headers}\n"
|
|
f"{payload_hash}"
|
|
)
|
|
|
|
return canonical_request, signed_headers
|
|
|
|
def _prepare_string_to_sign(self, canonical_request: str, date: str, date_simple: str) -> str:
|
|
"""Preparar string to sign"""
|
|
canonical_hash = self._to_hex(canonical_request)
|
|
|
|
string_to_sign = (
|
|
f"HMAC-SHA256\n"
|
|
f"{date}\n"
|
|
f"{date_simple}/{self.client}/{self.user_id}/elcano_request\n"
|
|
f"{canonical_hash}"
|
|
)
|
|
|
|
return string_to_sign
|
|
|
|
def _calculate_signature(self, string_to_sign: str, date_simple: str) -> str:
|
|
"""Calcular firma"""
|
|
signing_key = self._get_signature_key(date_simple)
|
|
signature = self._hmac_sha256(signing_key, string_to_sign)
|
|
return signature.hex()
|
|
|
|
def sign_request(self, method: str, host: str, path: str,
|
|
params: str = "", payload: str = "") -> Dict[str, str]:
|
|
"""
|
|
Firmar una petición HTTP
|
|
|
|
Args:
|
|
method: Método HTTP (GET, POST, etc.)
|
|
host: Host (ej: circulacion.api.adif.es)
|
|
path: Path de la petición
|
|
params: Query parameters (vacío si no hay)
|
|
payload: Body JSON (vacío para GET)
|
|
|
|
Returns:
|
|
Dict con todos los headers necesarios
|
|
"""
|
|
# Timestamps
|
|
now = datetime.utcnow()
|
|
date = now.strftime("%Y%m%dT%H%M%SZ")
|
|
date_simple = now.strftime("%Y%m%d")
|
|
|
|
# Canonical request
|
|
canonical_request, signed_headers = self._prepare_canonical_request(
|
|
method, path, params, host, date, payload
|
|
)
|
|
|
|
# String to sign
|
|
string_to_sign = self._prepare_string_to_sign(canonical_request, date, date_simple)
|
|
|
|
# Signature
|
|
signature = self._calculate_signature(string_to_sign, date_simple)
|
|
|
|
# Authorization header
|
|
authorization = (
|
|
f"HMAC-SHA256 "
|
|
f"Credential={self.access_key}/{date_simple}/{self.client}/{self.user_id}/elcano_request,"
|
|
f"SignedHeaders={signed_headers},"
|
|
f"Signature={signature}"
|
|
)
|
|
|
|
return {
|
|
"X-Elcano-Host": host,
|
|
"Content-type": "application/json;charset=utf-8",
|
|
"X-Elcano-Client": self.client,
|
|
"X-Elcano-Date": date,
|
|
"X-Elcano-UserId": self.user_id,
|
|
"Authorization": authorization
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Test con las claves extraídas
|
|
auth = AdifAuthenticator(
|
|
access_key="and20210615",
|
|
secret_key="Jthjtr946RTt",
|
|
user_id="0c8c32dce47f8512"
|
|
)
|
|
|
|
# Ejemplo de firma
|
|
payload = json.dumps({
|
|
"stationCode": "10200",
|
|
"commercialService": "BOTH",
|
|
"commercialStopType": "BOTH",
|
|
"page": {"pageNumber": 0},
|
|
"trafficType": "CERCANIAS"
|
|
})
|
|
|
|
headers = auth.sign_request(
|
|
method="POST",
|
|
host="circulacion.api.adif.es",
|
|
path="/portroyalmanager/secure/circulationpaths/departures/traffictype/",
|
|
payload=payload
|
|
)
|
|
|
|
print("Headers generados:")
|
|
for key, value in headers.items():
|
|
print(f"{key}: {value}")
|