Driver API Reference

Warning

This project is still evolving, so these docs may be incomplete or out-of-date.

Driver

class jumpstarter.driver.Driver(*, uuid: ~uuid.UUID = <factory>, labels: dict[str, str] = <factory>, children: dict[str, Driver] = <factory>)

Base class for drivers

Drivers should at the minimum implement the client method.

Regular or streaming driver calls can be marked with the export decorator. Raw stream constructors can be marked with the exportstream decorator.

abstract classmethod client() str

Return full import path of the corresponding driver client class

resources: dict[UUID, Any]

Dict of client side resources

@jumpstarter.driver.export

Decorator for exporting method as driver call

@jumpstarter.driver.exportstream

Decorator for exporting method as stream

Driver Client

class jumpstarter.client.DriverClient(*, uuid: ~uuid.UUID = <factory>, labels: dict[str, str] = <factory>, channel: ~grpc.aio._base_channel.Channel, children: dict[str, DriverClient] = <factory>, portal: BlockingPortal)

Base class for driver clients

Client methods can be implemented as regular functions, and call the call or streamingcall helpers internally to invoke exported methods on the driver.

Additional client functionalities such as raw stream connections or sharing client-side resources can be added by inheriting mixin classes under jumpstarter.drivers.mixins

call(method, *args)

Invoke driver call

Parameters:
  • method (str) – method name of driver call

  • args (list[Any]) – arguments for driver call

Returns:

driver call result

Return type:

Any

close()

Close the open stream session without a context manager.

open_stream() BlockingStream

Open a blocking stream session without a context manager.

Returns:

blocking stream session object.

Return type:

BlockingStream

stream(method='connect')

Open a blocking stream session with a context manager.

Parameters:

method (str) – method name of streaming driver call

Returns:

blocking stream session object context manager.

streamingcall(method, *args)

Invoke streaming driver call

Parameters:
  • method (str) – method name of streaming driver call

  • args (list[Any]) – arguments for streaming driver call

Returns:

streaming driver call result

Return type:

Generator[Any, None, None]

Example

from anyio import connect_tcp, sleep
from contextlib import asynccontextmanager
from collections.abc import Generator
from jumpstarter.driver import Driver, export, exportstream
from jumpstarter.client import DriverClient
from jumpstarter.common.utils import serve

class ExampleDriver(Driver):
    @classmethod
    def client(cls) -> str:
        return f"example.ExampleClient"

    @export
    def echo(self, message) -> str:
        return message

    # driver calls can be either sync or async
    @export
    async def echo_async(self, message) -> str:
        await sleep(5)
        return message

    @export
    def echo_generator(self, message) -> Generator[str, None, None]:
        for _ in range(10):
            yield message

    # stream constructor has to be an AsyncContextManager
    # that yield an anyio.abc.ObjectStream
    @exportstream
    @asynccontextmanager
    async def connect_tcp(self):
        async with await connect_tcp(remote_host="example.com", remote_port=80) as stream:
            yield stream

class ExampleClient(DriverClient):
    # client methods are sync
    def echo(self, message) -> str:
        return self.call("echo", message)
        # async driver methods can be invoked the same way
        # return self.call("echo_async", message)

    def echo_generator(self, message) -> Generator[str, None, None]:
        yield from self.streamingcall("echo_generator", message)

with serve(ExampleDriver()) as client:
    print(client.echo("hello"))
    assert list(client.echo_generator("hello")) == ["hello"] * 10
hello