[libvirt PATCH 09/12] tools: support generating SEV secret injection tables

Daniel P. Berrangé berrange at redhat.com
Fri Oct 7 11:43:04 UTC 2022


It is possible to build OVMF for SEV with an embedded Grub that can
fetch LUKS disk secrets. This adds support for injecting secrets in
the required format.

Signed-off-by: Daniel P. Berrangé <berrange at redhat.com>
---
 docs/manpages/virt-qemu-sev-validate.rst |  66 ++++++++++
 tools/virt-qemu-sev-validate.py          | 155 +++++++++++++++++++++--
 2 files changed, 212 insertions(+), 9 deletions(-)

diff --git a/docs/manpages/virt-qemu-sev-validate.rst b/docs/manpages/virt-qemu-sev-validate.rst
index fcc13d68c8..7542bea9aa 100644
--- a/docs/manpages/virt-qemu-sev-validate.rst
+++ b/docs/manpages/virt-qemu-sev-validate.rst
@@ -187,6 +187,29 @@ understand any configuration mistakes that have been made. If the
 will be skipped. The result is that the validation will likely be reported as
 failed.
 
+Secret injection options
+------------------------
+
+These options provide a way to inject a secret if validation of the
+launch measurement passes.
+
+``--disk-password PATH``
+
+Path to a file containing the password to use to unlock the LUKS container
+for the guest disk.
+
+``--secret-header PATH``
+
+Path to a file in which the injected secret header will be written in base64
+format and later injected into the domain. This is required if there is no
+connection to libvirt, otherwise the secret will be directly injected.
+
+``--secret-payload PATH``
+
+Path to a file in which the injected secret payload will be written in base64
+format and later injected into the domain. This is required if there is no
+connection to libvirt, otherwise the secret will be directly injected.
+
 EXAMPLES
 ========
 
@@ -261,6 +284,26 @@ automatically constructed VMSA:
        --build-id 13 \
        --policy 7
 
+Validate the measurement of a SEV guest booting from disk and
+inject a disk password on success:
+
+::
+
+   # virt-dom-sev-validate \
+       --loader OVMF.sev.fd \
+       --tk this-guest-tk.bin \
+       --measurement Zs2pf19ubFSafpZ2WKkwquXvACx9Wt/BV+eJwQ/taO8jhyIj/F8swFrybR1fZ2ID \
+       --api-major 0 \
+       --api-minor 24 \
+       --build-id 13 \
+       --policy 3 \
+       --disk-password passwd.txt \
+       --secret-header secret-header.b64 \
+       --secret-payload secret-payload.b64
+
+The ``secret-header.b64`` and ``secret-payload.b64`` files can now be sent to
+the virtualization host for injection.
+
 Fetch from remote libvirt
 -------------------------
 
@@ -321,6 +364,18 @@ automatically constructed VMSA:
        --tk this-guest-tk.bin \
        --domain fedora34x86_64
 
+Validate the measurement of a SEV guest booting from disk and
+inject a disk password on success:
+
+::
+
+   # virt-dom-sev-validate \
+       --connect qemu+ssh://root@some.remote.host/system \
+       --loader OVMF.sev.fd \
+       --tk this-guest-tk.bin \
+       --domain fedora34x86_64 \
+       --disk-password passwd.txt
+
 Fetch from local libvirt
 ------------------------
 
@@ -371,6 +426,17 @@ automatically constructed VMSA:
        --tk this-guest-tk.bin \
        --domain fedora34x86_64
 
+Validate the measurement of a SEV guest booting from disk and
+inject a disk password on success:
+
+::
+
+   # virt-dom-sev-validate \
+       --insecure \
+       --tk this-guest-tk.bin \
+       --domain fedora34x86_64 \
+       --disk-password passwd.txt
+
 EXIT STATUS
 ===========
 
diff --git a/tools/virt-qemu-sev-validate.py b/tools/virt-qemu-sev-validate.py
index 5da1353e60..2c5ad9083d 100755
--- a/tools/virt-qemu-sev-validate.py
+++ b/tools/virt-qemu-sev-validate.py
@@ -36,18 +36,21 @@
 
 import abc
 import argparse
-from base64 import b64decode
+from base64 import b64decode, b64encode
 from hashlib import sha256
 import hmac
 import libvirt
 import logging
 from lxml import etree
+import os
 import re
 import socket
 from struct import pack
 import sys
 import traceback
 from uuid import UUID
+from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
+
 
 log = logging.getLogger()
 
@@ -580,7 +583,26 @@ class KernelTable(GUIDTable):
         return entries
 
 
-class ConfidentialVM(object):
+class SecretsTable(GUIDTable):
+
+    TABLE_GUID = UUID('{1e74f542-71dd-4d66-963e-ef4287ff173b}').bytes_le
+    DISK_PW_GUID = UUID('{736869e5-84f0-4973-92ec-06879ce3da0b}').bytes_le
+
+    def __init__(self):
+        super().__init__(guid=self.TABLE_GUID,
+                         lenlen=4)
+        self.disk_password = None
+
+    def load_disk_password(self, path):
+        with open(path, 'rb') as fh:
+            self.disk_password = fh.read()
+
+    def entries(self):
+        return self.build_entry(self.DISK_PW_GUID,
+                                self.disk_password + bytes([0]), 4)
+
+
+class ConfidentialVM(abc.ABC):
     POLICY_BIT_SEV_ES = 2
     POLICY_VAL_SEV_ES = (1 << POLICY_BIT_SEV_ES)
 
@@ -606,6 +628,7 @@ class ConfidentialVM(object):
         self.vmsa_cpu1 = None
 
         self.kernel_table = KernelTable()
+        self.secrets_table = SecretsTable()
 
     def is_sev_es(self):
         return self.policy & self.POLICY_VAL_SEV_ES
@@ -758,6 +781,82 @@ class ConfidentialVM(object):
             raise AttestationFailedException(
                 "Measurement does not match, VM is not trustworthy")
 
+    def build_secrets(self):
+        measurement, _ = self.get_measurements()
+
+        iv = os.urandom(16)
+
+        secret_table = self.secrets_table.build()
+
+        cipher = Cipher(algorithms.AES(self.tek), modes.CTR(iv))
+        enc = cipher.encryptor()
+        secret_table_ciphertext = (enc.update(secret_table) +
+                                   enc.finalize())
+
+        flags = 0
+
+        ##
+        # Table 55. LAUNCH_SECRET Packet Header Buffer
+        ##
+        header = (
+            flags.to_bytes(4, byteorder='little') +
+            iv
+        )
+
+        # AMD Secure Encrypted Virtualization API , section 6.6
+        #
+        #  hdrmac = HMAC(0x01 || FLAGS || IV || GUEST_LENGTH ||
+        #                TRANS_LENGTH || DATA ||
+        #                MEASURE; GCTX.TIK)
+        #
+        msg = (
+            bytes([0x01]) +
+            flags.to_bytes(4, byteorder='little') +
+            iv +
+            len(secret_table).to_bytes(4, byteorder='little') +
+            len(secret_table).to_bytes(4, byteorder='little') +
+            secret_table_ciphertext +
+            measurement
+        )
+
+        h = hmac.new(self.tik, msg, 'sha256')
+        header = (
+            flags.to_bytes(4, byteorder='little') +
+            iv +
+            h.digest()
+        )
+
+        header64 = b64encode(header).decode('utf8')
+        secret64 = b64encode(secret_table_ciphertext).decode('utf8')
+        log.debug("Header: %s (%d bytes)" % (header64, len(header)))
+        log.debug("Secret: %s (%d bytes)" % (
+            secret64, len(secret_table_ciphertext)))
+
+        return header64, secret64
+
+    @abc.abstractmethod
+    def inject_secrets(self):
+        pass
+
+
+class OfflineConfidentialVM(ConfidentialVM):
+    def __init__(self,
+                 secret_header=None,
+                 secret_payload=None,
+                 **kwargs):
+        super().__init__(**kwargs)
+
+        self.secret_header = secret_header
+        self.secret_payload = secret_payload
+
+    def inject_secrets(self):
+        header64, secret64 = self.build_secrets()
+
+        with open(self.secret_header, "wb") as fh:
+            fh.write(header64.encode('utf8'))
+        with open(self.secret_payload, "wb") as fh:
+            fh.write(secret64.encode('utf8'))
+
 
 class LibvirtConfidentialVM(ConfidentialVM):
     def __init__(self, **kwargs):
@@ -937,6 +1036,14 @@ class LibvirtConfidentialVM(ConfidentialVM):
                 cpu_stepping = int(sig[0].get("stepping"))
                 self.build_vmsas(cpu_family, cpu_model, cpu_stepping)
 
+    def inject_secrets(self):
+        header64, secret64 = self.build_secrets()
+
+        params = {"sev-secret": secret64,
+                  "sev-secret-header": header64}
+        self.dom.setLaunchSecurityState(params, 0)
+        self.dom.resume()
+
 
 def parse_command_line():
     parser = argparse.ArgumentParser(
@@ -999,6 +1106,14 @@ def parse_command_line():
     vmconn.add_argument('--ignore-config', '-g', action='store_true',
                         help='Do not attempt to sanity check the guest config')
 
+    # Arguments related to secret injection
+    parser.add_argument('--disk-password', '-s',
+                        help='Path to LUKS disk password to inject')
+    parser.add_argument('--secret-payload',
+                        help='Path to file to write secret data payload to')
+    parser.add_argument('--secret-header',
+                        help='Path to file to write secret data header to')
+
     return parser.parse_args()
 
 
@@ -1039,6 +1154,15 @@ def check_usage(args):
             raise UnsupportedUsageException(
                 "Either --firmware or --domain is required")
 
+        if args.disk_password is not None:
+            if args.secret_header is None:
+                raise UnsupportedUsageException(
+                    "Either --secret-header or --domain is required")
+
+            if args.secret_payload is None:
+                raise UnsupportedUsageException(
+                    "Either --secret-payload or --domain is required")
+
     sku = [args.cpu_family, args.cpu_model, args.cpu_stepping]
     if sku.count(None) == len(sku):
         if args.vmsa_cpu1 is not None and args.vmsa_cpu0 is None:
@@ -1053,15 +1177,22 @@ def check_usage(args):
             raise UnsupportedUsageException(
                 "CPU SKU needs family, model and stepping for SEV-ES domain")
 
+    secret = [args.secret_payload, args.secret_header]
+    if secret.count(None) > 0 and secret.count(None) != len(secret):
+        raise UnsupportedUsageException(
+            "Both --secret-payload and --secret-header are required")
+
 
 def attest(args):
     if args.domain is None:
-        cvm = ConfidentialVM(measurement=args.measurement,
-                             api_major=args.api_major,
-                             api_minor=args.api_minor,
-                             build_id=args.build_id,
-                             policy=args.policy,
-                             num_cpus=args.num_cpus)
+        cvm = OfflineConfidentialVM(measurement=args.measurement,
+                                    api_major=args.api_major,
+                                    api_minor=args.api_minor,
+                                    build_id=args.build_id,
+                                    policy=args.policy,
+                                    num_cpus=args.num_cpus,
+                                    secret_header=args.secret_header,
+                                    secret_payload=args.secret_payload)
     else:
         cvm = LibvirtConfidentialVM(measurement=args.measurement,
                                     api_major=args.api_major,
@@ -1105,10 +1236,16 @@ def attest(args):
                         args.ignore_config)
 
     cvm.attest()
-
     if not args.quiet:
         print("OK: Looks good to me")
 
+    if args.disk_password:
+        cvm.secrets_table.load_disk_password(args.disk_password)
+
+        cvm.inject_secrets()
+        if not args.quiet:
+            print("OK: Injected password")
+
 
 if __name__ == "__main__":
     args = parse_command_line()
-- 
2.37.3



More information about the libvir-list mailing list