Source code for qrisp.interface.qunicorn.backend_server

"""
\********************************************************************************
* Copyright (c) 2023 the Qrisp authors
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License, v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is
* available at https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/
"""

from flask import Flask, request, jsonify, make_response
import yaml
from datetime import date, datetime
import threading

"""

This file sets up a server adhering to the interface specified by the Qunicorn middleware
developed in the SeQuenC project: https://sequenc.de/

If you are a backend provider, we don't recommend using this server for production,
since it only exposes the necessities to run quantum circuits and nothing else.
Instead check out the Qunicorn GitHub: https://github.com/qunicorn/qunicorn-core
And it's documentation on how to create new Pilots:
https://qunicorn-core.readthedocs.io/en/latest/tutorials/pilot_tutorial_demo.html

"""

# Returns the hosts ip
def get_ip():
    import socket

    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.settimeout(0)
    try:
        # doesn't even have to be reachable
        s.connect(("10.255.255.255", 1))
        IP = s.getsockname()[0]
    except Exception:
        IP = "127.0.0.1"
    finally:
        s.close()
    return IP


[docs] class BackendServer: """ This class allows convenient setup of a server respecting the `Qunicorn <https://qunicorn-core.readthedocs.io/en/latest/index.html>`_ interface. Parameters ---------- run_func : function A function that receives a QuantumCircuit, an integer specifying the amount of shots and a token in the form of a string. It returns the counts as a dictionary of bitstrings. ip_address : str, optional The IP address of where the listening socket should be opened. By default, the IP address of the hosting machine will be used. port : int, optional The port on which to listen for requests. By default, 9010 will be used. is_simulator : bool, optional A bool specifying whether the results returned by this server are due to a simulator. The default is True. Examples -------- We create a server listening on the localhost IP address using a run function which prints the token and queries the Aer-simulator. :: def run_func(qasm_str, shots): from qiskit import QuantumCircuit #Convert to qiskit qiskit_qc = QuantumCircuit.from_qasm_str(qasm_str) from qiskit_aer import AerSimulator qiskit_backend = AerSimulator() #Run Circuit on the Qiskit backend return qiskit_backend.run(qiskit_qc, shots = shots).result().get_counts() from qrisp.interface import BackendServer example_server = BackendServer(run_func, ip_address = "127.0.0.1", port = 8080) example_server.start() """ def __init__(self, run_func , ip_address = None, port = None): self.app = Flask(__name__) self.deployments = [] if ip_address is None: ip_address = get_ip() if port is None: port = 9010 self.port = port self.ip_address = ip_address self.jobs = [] self.run_func = run_func self.is_simulator = True # To set up the server we route the relevant functions for the flask app. #This function receives the deployment data and saves it in the #deployment attribute @self.app.route('/deployments', methods=['POST']) def deployments(): # Receive the quantum circuit data from the request data = request.get_json() #Append the deployment data to the library self.deployments.append(data) #For the response dictionary response = { "id": len(self.deployments)-1, "deployedBy": { "id": 0, "name": "" }, "deployedAt": str(date.today()), "name": "" } response["programs"] = data["programs"] #Create response object response_object = make_response(jsonify(response), 201) return response_object #This function receives the deployment id of a job and starts the execution @self.app.route('/jobs', methods=['POST']) def jobs_post(): # Receive the quantum circuit data from the request data = request.get_json() #Extract the deployment deployment_id = data["deploymentId"] deployment = self.deployments[deployment_id] #Create the wrapper for asynchronous execution of the job def job_wrapper(): for i in range(len(deployment["programs"])): counts_res = self.run_func(deployment["programs"][i]["quantumCircuit"], data["shots"], data["token"]) job_dic["counts"].append(counts_res) # try: # for i in range(len(deployment["programs"])): # counts_res = self.run_func(deployment["programs"][i]["quantumCircuit"], # data["shots"], # data["token"]) # job_dic["counts"].append(counts_res) # except Exception as e: # job_dic["exception"] = e job_dic["end_time"] = str(datetime.now()) #Create the thread run_thread = threading.Thread(target = job_wrapper) #Create the dictionary to hold information about the job job_dic = {} job_dic["deploymentId"] = deployment_id job_dic["counts"] = [] job_dic["shots"] = data["shots"] job_dic["exception"] = None job_dic["quantumCircuits"] = deployment["programs"] job_dic["run_thread"] = run_thread job_dic["start_time"] = str(datetime.now()) job_dic["end_time"] = "-" #Start the thread run_thread.start() #Append the job to jobs attribute self.jobs.append(job_dic) #Form the response response = { "id": len(self.jobs)-1, "jobName": "", "jobState": "running" } response_object = make_response(jsonify(response), 201) return response_object #This function retrieves information about a certain job. It is used by #the client to inquire the results of the job @self.app.route('/jobs/<int:job_id>/', methods=['GET']) def get_job(job_id): # Search for the job with the specified job_id if not job_id < len(self.jobs): return jsonify({"error": "Job not found"}), 404 #Get job information job = self.jobs[job_id] #Find out wether the job finished thread = job["run_thread"] if not thread.is_alive(): state = "finished" else: state = "running" #Find out wether the job suceeded if job["exception"] is not None: state = "failed" response = { "code": 0, "status": "failed", "message": str(job["exception"]), "errors": {} } response_object = make_response(jsonify(response)) return response_object #Form the response response = { "id": job_id, "executedBy": { "id": 0, "name": "" }, "executedOn": { "id": 0, "numQubits": -1, "isSimulator": self.is_simulator, "isLocal": True, "provider": { "id": 0, "withToken": True, "supportedLanguages": { "id": 0, "providerId": "string", "name": "" }, "name": "" } }, "progress": 0, "state": state, "type": "", "startedAt": job["start_time"], "finishedAt": job["end_time"], "data": "", "results": [{ "id": 0, "circuit": job["quantumCircuits"][i], "results": job["counts"][i], "resultType": "COUNTS", "metaData": {} } for i in range(len(job["counts"]))], "parameters": "" } response_object = make_response(jsonify(response), 201) return response_object
[docs] def start(self): from waitress import serve #Set up the thread to run the server def wrapper(): serve(self.app, host=self.ip_address, port=self.port) # self.app.run(host=self.ip_address, port=self.port) thr = threading.Thread(target=wrapper) thr.setDaemon(True) # Start the thread thr.start() import requests # Hold programm until the server answers while True: try: response = requests.get("http://" + self.ip_address + ":" + str(self.port) + "/jobs/0/") except: continue break