Skip to main content
This guide explains how to securely decrypt signed and encrypted messages received through a webhook using the CryptUtility class.

Overview

When working with sensitive data in webhooks, it’s important to verify the authenticity and decrypt the content properly. This process involves:
  1. Receiving an encrypted payload
  2. Loading your private key for decryption
  3. Loading the sender’s public key for verification
  4. Verifying and decrypting the message
  5. Processing the decrypted data

Prerequisites

  1. Your private key (for decryption)
  2. Sender public key (for verification)

Implementation Example

The CryptUtility Class Implementation

Below is the implementation of the CryptUtility class that handles the cryptographic operations:
CryptUtility.py
import base64
from typing import Union, Dict, Tuple, Optional, Any
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.backends import default_backend
from cryptography.exceptions import InvalidSignature
from cryptography.fernet import Fernet
import json


class CryptUtility:
    def __init__(self):
        self.private_key = None
        self.public_key = None

    def load_private_key(self, private_key_data: Union[str, bytes], password: Optional[Union[str, bytes]] = None) -> Any:
        """
        Load a private key from PEM formatted data.

        Args:
            private_key_data: PEM encoded private key data
            password: Optional password if the key is encrypted

        Returns:
            The loaded private key object
        """
        if isinstance(private_key_data, str):
            private_key_data = private_key_data.encode()

        self.private_key = serialization.load_pem_private_key(
            private_key_data,
            password=password,
            backend=default_backend()
        )
        self.public_key = self.private_key.public_key()
        return self.private_key

    def load_public_key(self, public_key_data: Union[str, bytes]) -> Any:
        """
        Load a public key from PEM formatted data.

        Args:
            public_key_data: PEM encoded public key data

        Returns:
            The loaded public key object
        """
        if isinstance(public_key_data, str):
            public_key_data = public_key_data.encode()

        self.public_key = serialization.load_pem_public_key(
            public_key_data,
            backend=default_backend()
        )
        return self.public_key

    def decrypt(self, encrypted_package: Union[str, bytes], private_key: Any = None) -> str:
        """
        Decrypt an encrypted package using hybrid encryption.

        Args:
            encrypted_package: The encrypted package containing the encrypted key and message
            private_key: The private key to use for decryption (uses self.private_key if None)

        Returns:
            The decrypted message as a string
        """
        if isinstance(encrypted_package, str):
            package = json.loads(base64.b64decode(
                encrypted_package).decode('utf-8'))
        else:
            package = json.loads(
                encrypted_package.decode('utf-8'))

        key_to_use = private_key if private_key is not None else self.private_key
        if key_to_use is None:
            raise ValueError(
                "No private key available. Generate or load one first.")

        encrypted_key = base64.b64decode(package["key"])
        encrypted_message = base64.b64decode(package["message"])

        key = key_to_use.decrypt(
            encrypted_key,
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )

        f = Fernet(key)
        decrypted_message = f.decrypt(encrypted_message)

        return decrypted_message.decode('utf-8')

    def verify(self, message: Union[str, bytes], signature: Union[str, bytes], public_key: Any = None) -> bool:
        """
        Verify a signature for a message.

        Args:
            message: The message to verify
            signature: The signature to verify
            public_key: The public key to use for verification (uses self.public_key if None)

        Returns:
            True if the signature is valid, False otherwise
        """
        if isinstance(message, str):
            message = message.encode()
        if isinstance(signature, str):
            signature = base64.b64decode(signature)

        key = public_key if public_key is not None else self.public_key
        if key is None:
            raise ValueError(
                "No public key available. Generate or load one first.")

        try:
            key.verify(
                signature,
                message,
                padding.PSS(
                    mgf=padding.MGF1(hashes.SHA256()),
                    salt_length=padding.PSS.MAX_LENGTH
                ),
                hashes.SHA256()
            )
            return True
        except InvalidSignature:
            return False

    def verify_secure_message(self, secure_message: Dict[str, str], sender_public_key: Any) -> Tuple[str, bool]:
        """
        Verify and decrypt a secure message.

        Args:
            secure_message: The secure message package with ciphertext and signature
            sender_public_key: The sender's public key

        Returns:
            tuple: (decrypted_message, is_authentic)
        """
        if self.private_key is None:
            raise ValueError("Private key required to decrypt the message")

        decrypted_message = self.decrypt(secure_message["encrypted_message"])

        is_authentic = self.verify(
            decrypted_message,
            secure_message["signature"],
            public_key=sender_public_key
        )

        return decrypted_message, is_authentic

Simple Decryption Function

Here’s a straightforward example function for decrypting and verifying webhook messages:
DescryptionFunction.py
import json
import base64
from typing import Dict, Any
from cryptutility import CryptUtility  # Import your CryptUtility class

def decrypt_webhook_message(request_data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Decrypt and verify a secure message received from a webhook.

    Args:
        request_data: The JSON data received from the webhook

    Returns:
        Dict containing the decrypted data if verification succeeds

    Raises:
        ValueError: If required fields are missing or verification fails
    """
    try:
        # Extract the message payload
        if not request_data.get('message'):
            raise ValueError("Message field is required in the request")

        message = request_data['message']

        # Create the signed payload structure
        signed_payload = {
            "encrypted_message": message.get("encrypted_message"),
            "signature": message.get("signature")
        }

        # Validate payload structure
        if not signed_payload["encrypted_message"] or not signed_payload["signature"]:
            raise ValueError("Encrypted message and signature are required")

        # Load your private key (in a real application, load these securely from environment/config)
        YOUR_PRIVATE_KEY = "YOUR_PRIVATE_KEY"  # Base64 encoded PEM
        SENDER_PUBLIC_KEY = "SENDER_PUBLIC_KEY"  # Base64 encoded PEM

        # Initialize the crypto utilities
        receiver_crypto = CryptUtility()
        receiver_crypto.load_private_key(base64.b64decode(YOUR_PRIVATE_KEY))

        sender_crypto = CryptUtility()
        sender_crypto.load_public_key(base64.b64decode(SENDER_PUBLIC_KEY))

        # Decrypt and verify the message
        decrypted_message, is_authentic = receiver_crypto.verify_secure_message(
            secure_message=signed_payload,
            sender_public_key=sender_crypto.public_key
        )

        # Check authenticity before processing
        if not is_authentic:
            raise ValueError("Message signature verification failed")

        # Parse the decrypted JSON data
        decrypted_data = json.loads(decrypted_message)

        print("Successfully decrypted and verified webhook message")
        return decrypted_data

    except Exception as e:
        print(f"Error processing webhook: {str(e)}")
        raise

Usage Example

Here’s how you might use this function in a webhook handler:
ExampleUsage.py
from flask import Flask, request, jsonify
from decryptionfunction import decrypt_webhook_message  # Import your decrypt_webhook_message function

app = Flask(__name__)

@app.route('/webhook', methods=['POST'])
def webhook_handler():
    try:
        # Get the JSON data from the request
        request_data = request.get_json()

        # Decrypt and verify the message
        decrypted_data = decrypt_webhook_message(request_data)

        # Process the decrypted data
        process_webhook_data(decrypted_data)

        # Return a success response
        return jsonify({"status": "success", "message": "Webhook received successfully"}), 200

    except ValueError as e:
        # Handle validation errors
        return jsonify({"status": "error", "message": str(e)}), 400

    except Exception as e:
        # Handle other errors
        return jsonify({"status": "error", "message": "An error occurred processing the webhook"}), 500

def process_webhook_data(data):
    """Process the decrypted webhook data"""
    # Implement your business logic here
    print(f"Processing webhook data: {data}")

Understanding the Message Format

The webhook payload is expected to contain a message with two main components:
  1. encrypted_message: The actual message content, encrypted using a hybrid encryption approach:
    • A symmetric key is generated and used to encrypt the message
    • The symmetric key is encrypted with the recipient’s public key
    • Both are packaged together in a structured format
  2. signature: A cryptographic signature created by the sender to verify authenticity:
    • The signature is created by signing the decrypted message with the sender’s private key
    • The recipient verifies this signature using the sender’s public key

Decrypted Message Structure

After successfully decrypting and verifying the webhook message, the decrypted content will contain data similar to what is returned by the Get Script endpoint response. For detailed information about the expected data structure, refer to: Script Response Data Payload
example_decrypted_data.json
{
  "message": "Script details",
  "data": {
    "script_id": 1,
    "reference_id": "1",
    "date_created": "2025-02-09T01:55:47Z",
    "status": "processed",
    "script_data": {
      "doctor_first_name": "John",
      "doctor_middle_name": null,
      "doctor_last_name": "Doe",
      "npi": "0123456789",
      "medications": [
        {
          "medication_name": "Ubrelvy",
          "medication_strength": "100 mg",
          "medication_form": "Oral tablet",
          "quantity": "27",
          "sig": "Take one tablet as needed for acute migraine. Maximum 1 tablet a day.",
          "daw_code": "DAW 1",
          "refills": "1",
          "potency_unit_code": "mg",
          "unit_total_strength": "2 mg",
          "unit_total_volume": "3 mL"
        }
      ],
      "allergies": "Penicillin, Aspirin",
      "written_date": "2024/11/22",
      "authenticity": {
        "score": 10,
        "interpretation": "Highly authentic - All critical elements present and doctor validated",
        "details": {
          "medications": "1/1 medications complete",
          "written_date": "Valid written date present",
          "doctor": "Doctor validated through NPI",
          "signature": "Signature validation skipped",
          "ndc": "Valid NDC present"
        }
      }
    },
    "original_metadata": {
      "medications": [
        {
          "medication": "Ubrelvy 100 mg oral tablet",
          "quantity": "27",
          "sig": "1 tab as needed for acute migraine. Max 1 tab a day.",
          "refills": "1"
        }
      ]
    }
  }
}