"""********************************************************************************
* Copyright (c) 2026 the Qrisp authors
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License, v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is
* available at https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************
"""
import warnings
from qiskit import QuantumCircuit as QiskitQuantumCircuit
from qrisp.circuit.quantum_circuit import QuantumCircuit
from qrisp.interface.backend import Backend
from qrisp.interface.job import (
Job,
JobCancelledError,
JobFailureError,
JobResult,
JobStatus,
)
__all__ = ["AQTBackend", "AQTJob"]
def _map_aqt_status(aqt_job) -> JobStatus:
"""Translate an AQT (Qiskit-compatible) job's status to :class:`~qrisp.interface.JobStatus`.
``VALIDATING`` has no direct Qrisp equivalent and is mapped to ``QUEUED``.
Any unrecognised status string falls back to ``RUNNING``.
"""
try:
raw = aqt_job.status()
except Exception:
return JobStatus.RUNNING
name = raw.name if hasattr(raw, "name") else str(raw).upper()
_MAP = {
"INITIALIZING": JobStatus.INITIALIZING,
"QUEUED": JobStatus.QUEUED,
"VALIDATING": JobStatus.QUEUED,
"RUNNING": JobStatus.RUNNING,
"DONE": JobStatus.DONE,
"CANCELLED": JobStatus.CANCELLED,
"ERROR": JobStatus.ERROR,
}
return _MAP.get(name, JobStatus.RUNNING)
class AQTJob(Job):
"""A :class:`~qrisp.interface.Job` that wraps a single AQT primitive job.
One ``AQTJob`` is created per :meth:`AQTBackend.run_async` call.
The underlying AQT job is already submitted before this object is returned;
:meth:`submit` only updates :attr:`~qrisp.interface.Job.last_known_status` to ``QUEUED``.
:meth:`result` blocks until the AQT hardware finishes, delegating the wait
to the AQT job's own ``result()`` call.
.. note::
AQT does not expose a job cancellation API via its Qiskit-compatible
sampler. :meth:`cancel` therefore always returns ``False``.
"""
def __init__(self, backend: "AQTBackend", aqt_job, cl_bits_per_circuit: list[int]):
"""Initialise the wrapper with the Qrisp backend, the AQT job, and the
per-circuit classical-bit counts needed to format result bitstrings.
"""
super().__init__(backend=backend)
self._aqt_job = aqt_job
self._cl_bits_per_circuit = cl_bits_per_circuit
# ------------------------------------------------------------------
# Abstract interface
# ------------------------------------------------------------------
def submit(self) -> None:
"""Mark the job as QUEUED. The AQT job is already submitted by the sampler in run_async()."""
self._last_known_status = JobStatus.QUEUED
def result(self, timeout: float | None = None) -> JobResult:
"""Block until the AQT job finishes and return the :class:`~qrisp.interface.JobResult`.
Waiting is delegated to the AQT job's own blocking ``result()`` call.
Parameters
----------
timeout : float or None, optional
Not currently forwarded to the AQT job. Included for interface
compatibility. Pass ``None`` (the default) to wait indefinitely.
Returns
-------
JobResult
Raises
------
JobFailureError
If the AQT job failed or raised an exception.
JobCancelledError
If the AQT job was cancelled.
"""
try:
aqt_result = self._aqt_job.result()
except TimeoutError:
self._last_known_status = _map_aqt_status(self._aqt_job)
raise
except Exception as exc:
terminal_status = _map_aqt_status(self._aqt_job)
self._last_known_status = terminal_status
if terminal_status == JobStatus.CANCELLED:
raise JobCancelledError(f"AQT job {self._job_id!r} was cancelled.") from exc
raise JobFailureError(f"AQT job {self._job_id!r} failed: {exc}") from exc
self._last_known_status = JobStatus.DONE
result_dicts = [
{bin(outcome)[2:].zfill(self._cl_bits_per_circuit[i]): prob for outcome, prob in quasi_dist.items()}
for i, quasi_dist in enumerate(aqt_result.quasi_dists)
]
return JobResult(result_dicts)
def cancel(self) -> bool:
"""AQT does not expose a job cancellation API.
Returns
-------
bool
Always ``False``.
"""
return False
def status(self) -> JobStatus:
"""Return the current :class:`~qrisp.interface.JobStatus` by querying the AQT job."""
self._last_known_status = _map_aqt_status(self._aqt_job)
return self._last_known_status
[docs]
class AQTBackend(Backend):
"""A :class:`~qrisp.interface.Backend` that executes circuits on AQT
quantum hardware via `AQT ARNICA <https://www.aqt.eu/products/arnica/>`_.
Circuits are transpiled from Qrisp's internal representation to Qiskit
``QuantumCircuit`` objects and submitted through AQT's ``AQTSampler``
primitive. An ``AQTJob`` handle is returned immediately. Call
:meth:`Job.result` to block and retrieve the
:class:`~qrisp.interface.JobResult`.
For further details, see the :ref:`Backend` documentation
and the `AQT ARNICA <https://www.aqt.eu/products/arnica/>`_ website.
.. note::
Requires the optional dependency ``qiskit-aqt-provider``::
pip install qiskit-aqt-provider
.. note::
``AQTSampler`` returns quasi-probability distributions
(float values that sum to 1.0) rather than integer counts.
These are returned as-is and are handled by Qrisp's measurement
normalisation logic.
Parameters
----------
api_token : str
API token for AQT ARNICA.
device_instance : str
The device instance to target, e.g. ``"ibex"`` or
``"simulator_noise"``. The offline simulators
``"offline_simulator_no_noise"`` and ``"offline_simulator_noise"``
are available with any API token.
For an up-to-date list see the AQT ARNICA website.
workspace : str | None = None
The workspace identifier for your company or project.
Examples
--------
We evaluate a :ref:`QuantumFloat` multiplication on the 12-qubit AQT IBEX:
>>> from qrisp import QuantumFloat
>>> from qrisp.interface import AQTBackend
>>> qrisp_ibex = AQTBackend(
... api_token="YOUR_AQT_ARNICA_TOKEN",
... device_instance="ibex",
... workspace="YOUR_COMPANY_OR_PROJECT_NAME",
... )
>>> a = QuantumFloat(2)
>>> a[:] = 2
>>> b = a * a
>>> b.get_measurement(backend=qrisp_ibex, shots=100)
{4: 0.49, 8: 0.11, 2: 0.08, 0: 0.06, ...}
"""
def __init__(self, api_token: str, device_instance: str, workspace: str | None = None):
if not isinstance(api_token, str):
raise TypeError("api_token must be a string. You can create an API token on the AQT ARNICA website.")
if workspace is not None and not isinstance(workspace, str):
raise TypeError("workspace must be a string.")
if not isinstance(device_instance, str):
raise TypeError(
"Please provide a device_instance as a string. "
"You can retrieve a list of available devices on the AQT ARNICA website."
)
try:
from qiskit_aqt_provider import AQTProvider
from qiskit_aqt_provider.primitives import AQTSampler
except ImportError as exc:
raise ImportError(
"Please install qiskit-aqt-provider to use AQTBackend: pip install qiskit-aqt-provider"
) from exc
provider = AQTProvider(api_token)
self._aqt_device = provider.get_backend(name=device_instance, workspace=workspace)
self._aqt_sampler = AQTSampler
name = self._aqt_device.name if isinstance(self._aqt_device.name, str) else self._aqt_device.name()
super().__init__(name=name)
@classmethod
def _default_options(cls):
"""Return the default runtime options (shots=100, matching the original AQT default)."""
return {"shots": 100}
@property
def max_circuits(self) -> int | None:
"""Maximum circuits per job, as reported by the underlying AQT device.
Delegates to the AQT device's own ``max_circuits`` attribute, which is
part of the Qiskit-compatible ``BackendV2`` interface that AQT exposes.
Returns ``None`` if the device does not expose a limit.
"""
value = getattr(self._aqt_device, "max_circuits", None)
return value if isinstance(value, int) else None
[docs]
def run_async(self, circuits, shots: int | list[int] | None = None) -> AQTJob:
"""Transpile and submit one or more circuits to the AQT backend.
This method returns an ``AQTJob`` immediately. Call
``Job.result`` on the returned object to block and retrieve the
:class:`~qrisp.interface.JobResult`.
Parameters
----------
circuits : QuantumCircuit or Sequence[QuantumCircuit]
One Qrisp circuit or a sequence of Qrisp circuits to execute.
shots : int or list[int] or None, optional
Number of shots. If ``None``, the backend's ``shots`` option is
used. If a ``list[int]`` is provided, the AQT sampler does not
support per-circuit shot counts, so all circuits are run at
``max(shots)`` and a ``UserWarning`` is emitted.
Returns
-------
AQTJob
"""
if isinstance(circuits, QuantumCircuit):
circuits = [circuits]
else:
circuits = list(circuits)
if isinstance(shots, list):
self._validate_shots_length(shots, circuits)
warnings.warn(
"AQTBackend does not support per-circuit shot counts. "
f"Running all {len(circuits)} circuits at max(shots)={max(shots)}.",
UserWarning,
stacklevel=2,
)
n_shots = max(shots)
else:
self._validate_shots(shots)
n_shots = shots if shots is not None else self._options.get("shots", 100)
self._check_circuit_limit(circuits)
# Transpile each Qrisp circuit and convert to a Qiskit QuantumCircuit.
# Re-index to a single monolithic register to avoid register-name issues.
qiskit_circuits = []
cl_bits_per_circuit = []
for qc in circuits:
qiskit_qc = qc.transpile().to_qiskit()
new_qiskit_qc = QiskitQuantumCircuit(len(qiskit_qc.qubits), len(qiskit_qc.clbits))
for instr in qiskit_qc:
new_qiskit_qc.append(
instr.operation,
[qiskit_qc.qubits.index(qb) for qb in instr.qubits],
[qiskit_qc.clbits.index(cb) for cb in instr.clbits],
)
qiskit_circuits.append(new_qiskit_qc)
cl_bits_per_circuit.append(len(qiskit_qc.clbits))
sampler = self._aqt_sampler(self._aqt_device)
sampler.set_transpile_options(optimization_level=3)
aqt_job = sampler.run(qiskit_circuits, shots=n_shots)
job = AQTJob(
backend=self,
aqt_job=aqt_job,
cl_bits_per_circuit=cl_bits_per_circuit,
)
job.submit()
return job