"""********************************************************************************
* Copyright (c) 2026 the Qrisp authors
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License, v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is
* available at https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************
"""
from uuid import UUID
from qrisp.circuit.quantum_circuit import QuantumCircuit
from qrisp.interface.backend import Backend
from qrisp.interface.job import (
JOB_FINAL_STATES,
Job,
JobCancelledError,
JobFailureError,
JobResult,
JobStatus,
)
# ---------------------------------------------------------------------------
# TEMPORARY IMPLEMENTATION — TO BE REMOVED
#
# This module is a stop-gap. Once the QCCSW release from IQM is published,
# the classes below will be deleted and replaced by a direct import.
#
# Do NOT extend or refactor this file; any effort spent here is wasted.
# ---------------------------------------------------------------------------
__all__ = ["IQMBackend", "IQMJob"]
def _map_iqm_status(iqm_job) -> JobStatus:
"""Translate an IQM ``CircuitJob`` status to :class:`~qrisp.interface.JobStatus`."""
try:
raw = iqm_job.status
name = raw.value if hasattr(raw, "value") else str(raw).lower()
except Exception:
return JobStatus.RUNNING
_status_map = {
"waiting": JobStatus.QUEUED,
"processing": JobStatus.RUNNING,
"completed": JobStatus.DONE,
"failed": JobStatus.ERROR,
"cancelled": JobStatus.CANCELLED,
}
return _status_map.get(name, JobStatus.RUNNING)
class IQMJob(Job):
"""A :class:`~qrisp.interface.Job` that wraps a single IQM ``CircuitJob``.
.. warning::
**Temporary implementation.** This class exists only until the IQM
QCCSW release ships the new Qrisp-compatible ``IQMBackend``.
At that point, this file will be deleted and ``IQMBackend`` will be imported
from that package instead. Do not depend on implementation details here.
One ``IQMJob`` is created per :meth:`IQMBackend.run_async` call.
The underlying IQM job is already submitted before this object is returned;
:meth:`submit` only updates :attr:`~qrisp.interface.Job.last_known_status`
to ``QUEUED``.
:meth:`result` blocks until IQM hardware finishes by delegating to
``CircuitJob.wait_for_completion()``. Counts are fetched from the server
in histogram form via ``IQMClient.get_job_measurement_counts``.
"""
def __init__(
self,
backend: "IQMBackend",
iqm_job,
shots_per_circuit: list[int],
client,
):
"""Initialise the wrapper.
Parameters
----------
backend : IQMBackend
The backend that created this job.
iqm_job : CircuitJob
The IQM client job returned by ``IQMClient.submit_circuits``.
shots_per_circuit : list[int]
Per-circuit shot counts (used to record the max sent to the IQM API).
client : IQMClient
The IQM client used to fetch measurement counts after completion.
Stored directly to avoid accessing a protected member of the backend.
"""
super().__init__(backend=backend, job_id=str(iqm_job.job_id))
self._iqm_job = iqm_job
self._shots_per_circuit = shots_per_circuit
self._iqm_client = client
self._cached_result: JobResult | None = None
# ------------------------------------------------------------------
# Abstract interface
# ------------------------------------------------------------------
def submit(self) -> None:
"""Mark the job as QUEUED. The IQM job is already submitted by the client in run_async()."""
self._last_known_status = JobStatus.QUEUED
def result(self, timeout: float | None = None) -> JobResult:
"""Block until the IQM job finishes and return the :class:`~qrisp.interface.JobResult`.
Waiting is delegated to ``CircuitJob.wait_for_completion()``.
Counts are fetched via ``IQMClient.get_job_measurement_counts``,
which returns per-circuit histograms (bitstring → count) directly.
Parameters
----------
timeout : float or None, optional
Not currently forwarded to the IQM client. Included for interface
compatibility. Pass ``None`` (the default) to wait indefinitely.
Returns
-------
JobResult
Raises
------
JobFailureError
If the IQM job failed.
JobCancelledError
If the IQM job was cancelled.
"""
if self._cached_result is not None:
return self._cached_result
try:
from iqm.iqm_client.iqm_client import JobStatus as IQMJobStatus
final_iqm_status = self._iqm_job.wait_for_completion()
if final_iqm_status == IQMJobStatus.CANCELLED:
self._last_known_status = JobStatus.CANCELLED
raise JobCancelledError(f"IQM job {self._job_id!r} was cancelled.")
if final_iqm_status != IQMJobStatus.COMPLETED:
self._last_known_status = JobStatus.ERROR
raise JobFailureError(f"IQM job {self._job_id!r} failed with status {final_iqm_status.value!r}.")
except (JobCancelledError, JobFailureError):
raise
except Exception as exc:
self._last_known_status = JobStatus.ERROR
raise JobFailureError(f"IQM job {self._job_id!r} failed: {exc}") from exc
self._last_known_status = JobStatus.DONE
counts_batch_raw = self._iqm_client.get_job_measurement_counts(self._iqm_job.job_id)
counts_batch = []
for c in counts_batch_raw:
counts_dic = {}
for key, val in c.counts.items():
counts_dic[key[::-1]] = val
counts_batch.append(counts_dic)
self._cached_result = JobResult(counts_batch)
return self._cached_result
def cancel(self) -> bool:
"""Attempt to cancel the underlying IQM job.
Returns
-------
bool
``True`` if the cancellation request was dispatched successfully;
``False`` if the job is already in a terminal state or cancellation
failed.
"""
if self._last_known_status in JOB_FINAL_STATES:
return False
try:
self._iqm_job.cancel()
self._last_known_status = JobStatus.CANCELLED
return True
except Exception:
return False
def status(self) -> JobStatus:
"""Return the current :class:`~qrisp.interface.JobStatus` by querying the IQM server."""
try:
self._iqm_job.update()
except Exception:
pass
self._last_known_status = _map_iqm_status(self._iqm_job)
return self._last_known_status
[docs]
class IQMBackend(Backend):
"""A :class:`~qrisp.interface.Backend` for executing circuits on IQM hardware.
.. warning::
**Temporary implementation.** This class exists only until the IQM
QCCSW release ships the new Qrisp-compatible ``IQMBackend``.
At that point, this file will be deleted and ``IQMBackend`` will be imported
from that package instead. Do not depend on implementation details here.
.. deprecated:: 0.8
``IQMBackend`` will be removed from qrisp in a future release once the
IQM QCCSW package is publicly available.
Circuits are transpiled from Qrisp's internal representation to Qiskit
``QuantumCircuit`` objects, serialized via the IQM provider backend, and
submitted through the IQM client. An ``IQMJob`` handle is returned
immediately. Call :meth:`Job.result` to block and retrieve the
:class:`~qrisp.interface.JobResult`.
Parameters
----------
api_token : str
An API token retrieved from the IQM Resonance website or IQM backend.
device_instance : str, optional
The device instance of the IQM backend such as ``garnet``.
For an up-to-date list, see the IQM Resonance website.
Required if ``server_url`` is not provided.
server_url : str, optional
The server URL of the IQM backend. If not provided, it defaults to IQM
Resonance using the ``device_instance``. If a server URL is provided,
a device instance should not be provided.
compilation_options : CircuitCompilationOptions, optional
An object to specify several options regarding pulse-level compilation.
transpiler : callable, optional
A function receiving and returning a QuantumCircuit, mapping the given
circuit to a hardware friendly circuit. By default the
``transpile_to_IQM`` function will be used.
calibration_set_id : str or UUID or None, optional
ID of the calibration set the backend will use. ``None`` means the IQM
Server will be queried for the current default calibration set.
use_metrics : bool, optional
If ``True``, the backend will query the server for calibration data and
related quality metrics. Defaults to ``False``.
use_timeslot : bool, optional
Passed to ``IQMClient.submit_circuits``. Defaults to ``False``.
Examples
--------
We evaluate a :ref:`QuantumFloat` multiplication on the 20-qubit IQM Garnet.
>>> from qrisp.interface import IQMBackend
>>> qrisp_garnet = IQMBackend(
... api_token="YOUR_IQM_RESONANCE_TOKEN", device_instance="garnet"
... )
>>> from qrisp import QuantumFloat
>>> a = QuantumFloat(2)
>>> a[:] = 2
>>> b = a*a
>>> b.get_measurement(backend = qrisp_garnet, shots = 1000)
{4: 0.548, ...}
**Manual qubit selection and routing**
::
from qrisp import QuantumCircuit
def custom_transpiler(qc: QuantumCircuit) -> QuantumCircuit:
return qc.transpile(basis_gates = ["cz", "r", "measure", "reset"])
custom_transpiled_garnet = IQMBackend("YOUR_IQM_RESONANCE_TOKEN",
device_instance = "garnet",
transpiler = custom_transpiler)
qc = QuantumCircuit(2)
qc.h(0)
qc.cx(0, 1)
qc.measure(0)
meas_res = qc.run(shots = 10000, backend = custom_transpiled_garnet)
"""
def __init__(
self,
api_token: str,
device_instance: str | None = None,
server_url: str | None = None,
compilation_options=None,
transpiler=None,
calibration_set_id: str | UUID | None = None,
use_metrics: bool = False,
use_timeslot: bool = False,
):
if not isinstance(api_token, str):
raise TypeError("api_token must be a string. You can create an API token on the IQM Resonance website.")
if server_url is not None and device_instance is not None:
raise ValueError("Please provide either a server_url or a device_instance, but not both.")
if server_url is None and device_instance is None:
raise ValueError("Please provide either a server_url or a device_instance.")
if device_instance is not None and not isinstance(device_instance, str):
raise TypeError(
"device_instance must be a string. "
"You can retrieve a list of available devices on the IQM Resonance website."
)
if server_url is not None and not isinstance(server_url, str):
raise TypeError("server_url must be a string.")
try:
from iqm.iqm_client import CircuitCompilationOptions
from iqm.iqm_client.iqm_client import IQMClient
from iqm.qiskit_iqm import transpile_to_IQM
from iqm.qiskit_iqm.iqm_provider import IQMBackend as _IQMProviderBackend
except ImportError as exc:
raise ImportError(
"Please install qiskit-iqm to use the IQMBackend. You can do this by running `pip install qrisp[iqm]`."
) from exc
if server_url is None:
server_url = "https://resonance.meetiqm.com/"
self._client = IQMClient(iqm_server_url=server_url, token=api_token, quantum_computer=device_instance)
self._iqm_provider_backend = _IQMProviderBackend(
self._client,
calibration_set_id=calibration_set_id,
use_metrics=use_metrics,
)
if compilation_options is None:
compilation_options = CircuitCompilationOptions()
self._compilation_options = compilation_options
self._use_timeslot = use_timeslot
self._device_instance = device_instance
self._calibration_set_id = calibration_set_id
self._transpile_to_iqm = transpile_to_IQM
if transpiler is None:
def transpiler(qc):
qiskit_qc = qc.to_qiskit()
transpiled_qiskit_qc = transpile_to_IQM(qiskit_qc, self._iqm_provider_backend)
return QuantumCircuit.from_qiskit(transpiled_qiskit_qc)
self._transpiler = transpiler
name = device_instance or server_url
super().__init__(name=name)
@classmethod
def _default_options(cls):
"""Return the default runtime options (shots=1000)."""
return {"shots": 1000}
def run_async(
self,
circuits,
shots: int | list[int] | None = None,
) -> IQMJob:
"""Transpile and submit one or more circuits to the IQM backend.
This method returns an ``IQMJob`` immediately. Call
:meth:`Job.result` on the returned object to block and retrieve the
:class:`~qrisp.interface.JobResult`.
Parameters
----------
circuits : QuantumCircuit or Sequence[QuantumCircuit]
One Qrisp circuit or a sequence of Qrisp circuits to execute.
shots : int or list[int] or None, optional
Number of shots. If ``None``, the backend's ``shots`` option is
used. If a ``list[int]`` is provided, each entry specifies the shot
count for the circuit at the corresponding index. The IQM client
always executes ``max(shots)`` repetitions for the entire batch.
Returns
-------
IQMJob
"""
if isinstance(circuits, QuantumCircuit):
circuits = [circuits]
else:
circuits = list(circuits)
if isinstance(shots, list):
self._validate_shots_length(shots, circuits)
shots_per_circuit = shots
else:
self._validate_shots(shots)
n_shots = shots if shots is not None else self._options.get("shots", 1000)
shots_per_circuit = [n_shots] * len(circuits)
self._check_circuit_limit(circuits)
circuit_batch = []
for qc in circuits:
if self._device_instance == "sirius":
qiskit_qc = self._transpile_to_iqm(qc.to_qiskit(), self._iqm_provider_backend)
else:
transpiled_qc = self._transpiler(qc)
qiskit_qc = transpiled_qc.to_qiskit()
circuit_batch.append(self._iqm_provider_backend.serialize_circuit(qiskit_qc))
iqm_job = self._client.submit_circuits(
circuit_batch,
calibration_set_id=self._calibration_set_id,
options=self._compilation_options,
shots=max(shots_per_circuit),
use_timeslot=self._use_timeslot,
)
job = IQMJob(
backend=self,
iqm_job=iqm_job,
shots_per_circuit=shots_per_circuit,
client=self._client,
)
job.submit()
return job