Driver API Reference¶
Warning
This project is still evolving, so these docs may be incomplete or out-of-date.
Driver¶
- class jumpstarter.driver.Driver(*, uuid: ~uuid.UUID = <factory>, labels: dict[str, str] = <factory>, children: dict[str, Driver] = <factory>)¶
Base class for drivers
Drivers should at the minimum implement the client method.
Regular or streaming driver calls can be marked with the export decorator. Raw stream constructors can be marked with the exportstream decorator.
- abstract classmethod client() str ¶
Return full import path of the corresponding driver client class
- resources: dict[UUID, Any]¶
Dict of client side resources
- @jumpstarter.driver.export¶
Decorator for exporting method as driver call
- @jumpstarter.driver.exportstream¶
Decorator for exporting method as stream
Driver Client¶
- class jumpstarter.client.DriverClient(*, uuid: ~uuid.UUID = <factory>, labels: dict[str, str] = <factory>, channel: ~grpc.aio._base_channel.Channel, children: dict[str, DriverClient] = <factory>, portal: BlockingPortal)¶
Base class for driver clients
Client methods can be implemented as regular functions, and call the call or streamingcall helpers internally to invoke exported methods on the driver.
Additional client functionalities such as raw stream connections or sharing client-side resources can be added by inheriting mixin classes under jumpstarter.drivers.mixins
- call(method, *args)¶
Invoke driver call
- Parameters:
method (str) – method name of driver call
args (list[Any]) – arguments for driver call
- Returns:
driver call result
- Return type:
Any
- close()¶
Close the open stream session without a context manager.
- open_stream() BlockingStream ¶
Open a blocking stream session without a context manager.
- Returns:
blocking stream session object.
- Return type:
BlockingStream
- stream(method='connect')¶
Open a blocking stream session with a context manager.
- Parameters:
method (str) – method name of streaming driver call
- Returns:
blocking stream session object context manager.
- streamingcall(method, *args)¶
Invoke streaming driver call
- Parameters:
method (str) – method name of streaming driver call
args (list[Any]) – arguments for streaming driver call
- Returns:
streaming driver call result
- Return type:
Generator[Any, None, None]
Example¶
from anyio import connect_tcp, sleep
from contextlib import asynccontextmanager
from collections.abc import Generator
from jumpstarter.driver import Driver, export, exportstream
from jumpstarter.client import DriverClient
from jumpstarter.common.utils import serve
class ExampleDriver(Driver):
@classmethod
def client(cls) -> str:
return f"example.ExampleClient"
@export
def echo(self, message) -> str:
return message
# driver calls can be either sync or async
@export
async def echo_async(self, message) -> str:
await sleep(5)
return message
@export
def echo_generator(self, message) -> Generator[str, None, None]:
for _ in range(10):
yield message
# stream constructor has to be an AsyncContextManager
# that yield an anyio.abc.ObjectStream
@exportstream
@asynccontextmanager
async def connect_tcp(self):
async with await connect_tcp(remote_host="example.com", remote_port=80) as stream:
yield stream
class ExampleClient(DriverClient):
# client methods are sync
def echo(self, message) -> str:
return self.call("echo", message)
# async driver methods can be invoked the same way
# return self.call("echo_async", message)
def echo_generator(self, message) -> Generator[str, None, None]:
yield from self.streamingcall("echo_generator", message)
with serve(ExampleDriver()) as client:
print(client.echo("hello"))
assert list(client.echo_generator("hello")) == ["hello"] * 10
hello