-
Notifications
You must be signed in to change notification settings - Fork 159
/
session.py
363 lines (301 loc) · 13.9 KB
/
session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# This code is part of Qiskit.
#
# (C) Copyright IBM 2022.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Qiskit Runtime flexible session."""
from __future__ import annotations
from typing import Dict, Optional, Type, Union, Callable, Any
from types import TracebackType
from functools import wraps
from qiskit.providers.backend import BackendV1, BackendV2
from qiskit_ibm_runtime import QiskitRuntimeService
from .exceptions import IBMInputValueError, IBMRuntimeError
from .runtime_job import RuntimeJob
from .runtime_job_v2 import RuntimeJobV2
from .utils.result_decoder import ResultDecoder
from .ibm_backend import IBMBackend
from .utils.default_session import set_cm_session
from .utils.converters import hms_to_seconds
from .fake_provider.local_service import QiskitRuntimeLocalService
def _active_session(func): # type: ignore
"""Decorator used to ensure the session is active."""
@wraps(func)
def _wrapper(self, *args, **kwargs): # type: ignore
if not self._active:
raise IBMRuntimeError("The session is closed.")
return func(self, *args, **kwargs)
return _wrapper
class Session:
"""Class for creating a Qiskit Runtime session.
A Qiskit Runtime ``session`` allows you to group a collection of iterative calls to
the quantum computer. A session is started when the first job within the session
is started. Subsequent jobs within the session are prioritized by the scheduler.
You can open a Qiskit Runtime session using this ``Session`` class and submit jobs
to one or more primitives.
For example::
from qiskit.circuit import QuantumCircuit, QuantumRegister, ClassicalRegister
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager
from qiskit_ibm_runtime import Session, SamplerV2 as Sampler
service = QiskitRuntimeService()
backend = service.least_busy(operational=True, simulator=False)
# Bell Circuit
qr = QuantumRegister(2, name="qr")
cr = ClassicalRegister(2, name="cr")
qc = QuantumCircuit(qr, cr, name="bell")
qc.h(qr[0])
qc.cx(qr[0], qr[1])
qc.measure(qr, cr)
pm = generate_preset_pass_manager(backend=backend, optimization_level=1)
isa_circuit = pm.run(qc)
with Session(backend=backend) as session:
sampler = Sampler(mode=session)
job = sampler.run([isa_circuit])
pub_result = job.result()[0]
print(f"Sampler job ID: {job.job_id()}")
print(f"Counts: {pub_result.data.cr.get_counts()}")
"""
_create_new_session = True
def __init__(
self,
backend: Optional[Union[BackendV1, BackendV2]] = None,
max_time: Optional[Union[int, str]] = None,
): # pylint: disable=line-too-long
"""Session constructor.
Args:
backend: Instance of ``Backend`` class.
max_time:
Maximum amount of time, a runtime session can be open before being
forcibly closed. Can be specified as seconds (int) or a string like "2h 30m 40s".
This value must be less than the
`system imposed maximum
<https://docs.quantum.ibm.com/guides/max-execution-time>`_.
Raises:
ValueError: If an input value is invalid.
"""
self._service: Optional[QiskitRuntimeService | QiskitRuntimeLocalService] = None
self._backend: Optional[BackendV1 | BackendV2] = None
self._instance = None
self._active = True
self._session_id = None
if isinstance(backend, IBMBackend):
self._service = backend.service
self._backend = backend
elif isinstance(backend, (BackendV1, BackendV2)):
self._service = QiskitRuntimeLocalService()
self._backend = backend
else:
raise ValueError(f"Invalid backend type {type(backend)}")
self._max_time = (
max_time
if max_time is None or isinstance(max_time, int)
else hms_to_seconds(max_time, "Invalid max_time value: ")
)
if isinstance(self._backend, IBMBackend):
self._instance = self._backend._instance
if not self._backend.configuration().simulator:
self._session_id = self._create_session()
def _create_session(self) -> Optional[str]:
"""Create a session."""
if isinstance(self._service, QiskitRuntimeService) and Session._create_new_session:
session = self._service._api_client.create_session(
self.backend(), self._instance, self._max_time, self._service.channel, "dedicated"
)
return session.get("id")
return None
@_active_session
def _run(
self,
program_id: str,
inputs: Dict,
options: Optional[Dict] = None,
callback: Optional[Callable] = None,
result_decoder: Optional[Type[ResultDecoder]] = None,
) -> Union[RuntimeJob, RuntimeJobV2]:
"""Run a program in the session.
Args:
program_id: Program ID.
inputs: Program input parameters. These input values are passed
to the runtime program.
options: Runtime options that control the execution environment.
callback: Callback function to be invoked for any interim results and final result.
Returns:
Submitted job.
"""
options = options or {}
if "instance" not in options:
options["instance"] = self._instance
options["backend"] = self._backend
if isinstance(self._service, QiskitRuntimeService):
job = self._service._run(
program_id=program_id, # type: ignore[arg-type]
options=options,
inputs=inputs,
session_id=self._session_id,
start_session=False,
callback=callback,
result_decoder=result_decoder,
)
if self._backend is None:
self._backend = job.backend()
else:
job = self._service._run( # type: ignore[call-arg]
program_id=program_id, # type: ignore[arg-type]
options=options,
inputs=inputs,
)
return job
def cancel(self) -> None:
"""Cancel all pending jobs in a session."""
self._active = False
if self._session_id and isinstance(self._service, QiskitRuntimeService):
self._service._api_client.cancel_session(self._session_id)
def close(self) -> None:
"""Close the session so new jobs will no longer be accepted, but existing
queued or running jobs will run to completion. The session will be terminated once there
are no more pending jobs."""
self._active = False
if self._session_id and isinstance(self._service, QiskitRuntimeService):
self._service._api_client.close_session(self._session_id)
def backend(self) -> Optional[str]:
"""Return backend for this session.
Returns:
Backend for this session. None if unknown.
"""
if self._backend:
return self._backend.name if self._backend.version == 2 else self._backend.name()
return None
def status(self) -> Optional[str]:
"""Return current session status.
Returns:
Session status as a string.
* ``Pending``: Session is created but not active.
It will become active when the next job of this session is dequeued.
* ``In progress, accepting new jobs``: session is active and accepting new jobs.
* ``In progress, not accepting new jobs``: session is active and not accepting new jobs.
* ``Closed``: max_time expired or session was explicitly closed.
* ``None``: status details are not available.
"""
details = self.details()
if details:
state = details["state"]
accepting_jobs = details["accepting_jobs"]
if state in ["open", "inactive"]:
return "Pending"
if (state == "active" and accepting_jobs) or state == "pending_inactive":
return "In progress, accepting new jobs"
if (state == "active" and not accepting_jobs) or state == "pending_closed":
return "In progress, not accepting new jobs"
return state.capitalize()
return None
def usage(self) -> Optional[float]:
"""Return session usage in seconds.
Session usage is the time from when the first job starts until the session goes inactive,
is closed, or when its last job completes, whichever happens last.
Batch usage is the amount of time all jobs spend on the QPU.
"""
if self._session_id and isinstance(self._service, QiskitRuntimeService):
response = self._service._api_client.session_details(self._session_id)
if response:
return response.get("elapsed_time")
return None
def details(self) -> Optional[Dict[str, Any]]:
"""Return session details.
Returns:
A dictionary with the sessions details.
* ``id``: id of the session.
* ``backend_name``: backend used for the session.
* ``interactive_timeout``: The maximum idle time (in seconds) between jobs that
is allowed to occur before the session is deactivated.
* ``max_time``: Maximum allowed time (in seconds) for the session, subject to plan limits.
* ``active_timeout``: The maximum time (in seconds) a session can stay active.
* ``state``: State of the session - open, active, inactive, or closed.
* ``accepting_jobs``: Whether or not the session is accepting jobs.
* ``last_job_started``: Timestamp of when the last job in the session started.
* ``last_job_completed``: Timestamp of when the last job in the session completed.
* ``started_at``: Timestamp of when the session was started.
* ``closed_at``: Timestamp of when the session was closed.
* ``activated_at``: Timestamp of when the session state was changed to active.
* ``mode``: Execution mode of the session.
* ``usage_time``: The usage time, in seconds, of this Session or Batch.
Usage is defined as the time a quantum system is committed to complete a job.
"""
if self._session_id and isinstance(self._service, QiskitRuntimeService):
response = self._service._api_client.session_details(self._session_id)
if response:
return {
"id": response.get("id"),
"backend_name": response.get("backend_name"),
"interactive_timeout": response.get("interactive_ttl"),
"max_time": response.get("max_ttl"),
"active_timeout": response.get("active_ttl"),
"state": response.get("state"),
"accepting_jobs": response.get("accepting_jobs"),
"last_job_started": response.get("last_job_started"),
"last_job_completed": response.get("last_job_completed"),
"started_at": response.get("started_at"),
"closed_at": response.get("closed_at"),
"activated_at": response.get("activated_at"),
"mode": response.get("mode"),
"usage_time": response.get("elapsed_time"),
}
return None
@property
def session_id(self) -> Optional[str]:
"""Return the session ID.
Returns:
Session ID. None if the backend is a simulator.
"""
return self._session_id
@property
def service(self) -> QiskitRuntimeService:
"""Return service associated with this session.
Returns:
:class:`qiskit_ibm_runtime.QiskitRuntimeService` associated with this session.
"""
return self._service
@classmethod
def from_id(cls, session_id: str, service: QiskitRuntimeService) -> "Session":
"""Construct a Session object with a given session_id
Args:
session_id: the id of the session to be created. This must be an already
existing session id.
service: instance of the ``QiskitRuntimeService`` class.
Raises:
IBMInputValueError: If given `session_id` does not exist.
Returns:
A new Session with the given ``session_id``
"""
response = service._api_client.session_details(session_id)
backend = service.backend(response.get("backend_name"))
mode = response.get("mode")
state = response.get("state")
class_name = "dedicated" if cls.__name__.lower() == "session" else cls.__name__.lower()
if mode != class_name:
raise IBMInputValueError(
f"Input ID {session_id} has execution mode {mode} instead of {class_name}."
)
cls._create_new_session = False
session = cls(backend)
cls._create_new_session = True
if state == "closed":
session._active = False
session._session_id = session_id
return session
def __enter__(self) -> "Session":
set_cm_session(self)
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
set_cm_session(None)
self.close()