Module nux.session

Session and its asynchronous API for model inference

Expand source code
"""Session and its asynchronous API for model inference"""

import ctypes
from ctypes import c_void_p, byref, c_int32
from typing import Union
import io

import onnx
import numpy as np

from nux.errors import into_exception, is_ok, UnsupportedTensorType, is_err
from ._api import LIBNUX
from ._api.v1 import increase_ref_count, decref
from .model import Model, TensorArray
from .tensor import TensorDesc


def _fill_tensors(values: Union[np.ndarray, np.generic, TensorArray],
                  targets: TensorArray) -> TensorArray:
    """
    Fills `targets` with buffers copied from `values`
    """
    if isinstance(values, (np.ndarray, np.generic)):
        targets[0].copy_from(values)
        return targets

    if isinstance(values, list):
        for idx, value in enumerate(values):
            targets[idx].copy_from(value)
        return targets

    if isinstance(values, TensorArray):
        return values

    raise UnsupportedTensorType()


class Session(Model):
    """Provides a blocking API to run an inference task with a given model"""
    ref = c_void_p(None)

    def __init__(self, model):
        sess = c_void_p(None)
        options: c_void_p = LIBNUX.nux_session_option_create()

        model_image = _model_image(model)

        err = LIBNUX.nux_session_create(model_image, len(model_image), options, byref(sess))
        if is_err(err):
            raise into_exception(err)

        self.ref = sess
        self._as_parameter_ = self.ref

        super().__init__()

    def _get_model_ref(self) -> c_void_p:
        return LIBNUX.nux_session_get_model(self)

    def run(self, inputs) -> TensorArray:
        """
        Runs an inference task with `inputs`

        Args:
            inputs: It can be a single nux.Tensor, nux.TensorArray or \
            numpy.ndarray object. Also, you can pass one TensorArray or a \
            list of numpy.ndarray objects.

        Returns:
            Inference output
        """
        _inputs = self.allocate_inputs()
        outputs = self.create_outputs()
        _inputs = _fill_tensors(inputs, _inputs)

        err = LIBNUX.nux_session_run(self.ref, _inputs, outputs)

        if is_err(err):
            raise into_exception(err)

        return outputs

    def close(self):
        """Close the session and release all resources belonging to the session"""
        if self.ref:
            LIBNUX.nux_session_destroy(self.ref)
            self.ref = None

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def __del__(self):
        self.close()


class CompletionQueue:
    """Receives the completion results asynchronously from AsyncSession"""
    ref = c_void_p(None)
    context_ty: type
    output_descs: [TensorDesc]

    def __init__(self, ref: c_void_p, context_ty: type, output_descs: [TensorDesc]):
        self._as_parameter_ = ref
        self.ref = ref
        self.context_ty = context_ty
        self.output_descs = output_descs
        self.queue_ok = True

    def recv(self) -> (object, TensorArray):
        """Receives the prediction results asynchronously coming from AsyncSession

        If there are already prediction outputs, it will return immediately.
        Or it will be blocked until the next result are ready.

        Returns:
            A tuple, whose first value is the context value passed \
            when you submit an inference task and the second value \
            is inference output.
        """
        err = c_int32(0)
        context_ref = ctypes.py_object(None)
        outputs_ref = c_void_p(None)

        self.queue_ok = LIBNUX.nux_completion_queue_next(self.ref,
                                                         byref(context_ref),
                                                         byref(outputs_ref),
                                                         byref(err))
        context_val = context_ref.value
        decref(context_ref)

        if is_ok(err.value):
            return context_val, TensorArray(outputs_ref, self.output_descs, allocated=False)

        raise into_exception(err)

    def close(self):
        """Closes this completion queue.

        If it is closed, AsyncSession also will stop working.
        """
        if self.ref:
            LIBNUX.nux_completion_queue_destroy(self.ref)
            self.ref = None

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def __del__(self):
        self.close()

    def __iter__(self):
        return self

    def __next__(self):
        if self.queue_ok:
            return self.recv()

        raise StopIteration()


class AsyncSession(Model):
    """An asynchronous session for a given model allows to submit predictions"""
    ref = c_void_p(None)
    inputs: TensorArray

    def __init__(self, ref: c_void_p):
        self.ref = ref
        self._as_parameter_ = self.ref
        super().__init__()

        self.inputs = self.allocate_inputs()

    def _get_model_ref(self) -> c_void_p:
        return LIBNUX.nux_async_session_get_model(self)

    def submit(self, values: Union[np.ndarray, np.generic, TensorArray],
               context: object=None) -> None:
        """
        Submit a prediction request

        It immediately returns without blocking the caller, and
        If the prediction is completed, the outputs will be sent to CompletionQueue.

        Args:
            values: Input values
            context: an additional context to identify the prediction request
        """
        _fill_tensors(values, self.inputs)
        # manually increase reference count to keep the context object while running
        increase_ref_count(context)
        err = LIBNUX.nux_async_session_run(self.ref, context, self.inputs)

        if is_err(err):
            raise into_exception(err)

    def close(self):
        """Closes this session

        After a session is closed, CompletionQueue will return an error
        if CompletionQueue.recv() is called.
        """
        if self.ref:
            LIBNUX.nux_async_session_destroy(self.ref)
            self.ref = None

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def __del__(self):
        self.close()


def _read_file(path):
    with open(path, 'rb') as file:
        contents = file.read()
        return contents


def _model_image(model) -> bytes:
    if isinstance(model, bytes):
        model_image = model
    elif isinstance(model, str):
        model_image = _read_file(model)
    elif isinstance(model, onnx.ModelProto):
        model_io = io.BytesIO()
        onnx.save_model(model, model_io)
        model_io.seek(0)
        model_image = model_io.read()
    else:
        raise TypeError("'model' must be str or bytes, or onnx.ModelProto, but" + repr(type(model)))

    return model_image


def create(model) -> Session:
    """Creates a session for a model

    Args:
        model (bytes or str): a byte string containing a model image or \
        a path string of a model image file

    Returns:
        the session for a given model, allowing to run predictions. \
        Session is a thread safe.
    """
    return Session(model)


def create_async(model, context_ty: type = None) -> (AsyncSession, CompletionQueue):
    """Creates a pair of the asynchronous session and the completion queue for a given model

    Args:
        model (bytes or str): a byte string containing a model image or \
        a path string of a model image file

    Returns:
        A pair of the asynchronous session and the completion queue. \
        the asynchronous session for a given model allows to submit predictions. \
        the completion queue allows users to receive the prediction outputs \
        asynchronously.
    """

    try:
        model_image = _model_image(model)

        options: c_void_p = LIBNUX.nux_session_option_create()
        sess_ref = c_void_p(None)
        queue_ref = c_void_p(None)
        err = LIBNUX.nux_async_session_create(model_image, len(model_image), options,
                                              byref(sess_ref), byref(queue_ref))
        if is_ok(err):
            sess = AsyncSession(sess_ref)
            return sess, CompletionQueue(queue_ref, context_ty, sess.outputs())

        raise into_exception(err)
    finally:
        pass

Functions

def create(model) ‑> Session

Creates a session for a model

Args

model : bytes or str
a byte string containing a model image or a path string of a model image file

Returns

the session for a given model, allowing to run predictions. Session is a thread safe.

Expand source code
def create(model) -> Session:
    """Creates a session for a model

    Args:
        model (bytes or str): a byte string containing a model image or \
        a path string of a model image file

    Returns:
        the session for a given model, allowing to run predictions. \
        Session is a thread safe.
    """
    return Session(model)
def create_async(model, context_ty: type = None) ‑> (AsyncSession'>, CompletionQueue'>)

Creates a pair of the asynchronous session and the completion queue for a given model

Args

model : bytes or str
a byte string containing a model image or a path string of a model image file

Returns

A pair of the asynchronous session and the completion queue. the asynchronous session for a given model allows to submit predictions. the completion queue allows users to receive the prediction outputs asynchronously.

Expand source code
def create_async(model, context_ty: type = None) -> (AsyncSession, CompletionQueue):
    """Creates a pair of the asynchronous session and the completion queue for a given model

    Args:
        model (bytes or str): a byte string containing a model image or \
        a path string of a model image file

    Returns:
        A pair of the asynchronous session and the completion queue. \
        the asynchronous session for a given model allows to submit predictions. \
        the completion queue allows users to receive the prediction outputs \
        asynchronously.
    """

    try:
        model_image = _model_image(model)

        options: c_void_p = LIBNUX.nux_session_option_create()
        sess_ref = c_void_p(None)
        queue_ref = c_void_p(None)
        err = LIBNUX.nux_async_session_create(model_image, len(model_image), options,
                                              byref(sess_ref), byref(queue_ref))
        if is_ok(err):
            sess = AsyncSession(sess_ref)
            return sess, CompletionQueue(queue_ref, context_ty, sess.outputs())

        raise into_exception(err)
    finally:
        pass

Classes

class AsyncSession (ref: ctypes.c_void_p)

An asynchronous session for a given model allows to submit predictions

Expand source code
class AsyncSession(Model):
    """An asynchronous session for a given model allows to submit predictions"""
    ref = c_void_p(None)
    inputs: TensorArray

    def __init__(self, ref: c_void_p):
        self.ref = ref
        self._as_parameter_ = self.ref
        super().__init__()

        self.inputs = self.allocate_inputs()

    def _get_model_ref(self) -> c_void_p:
        return LIBNUX.nux_async_session_get_model(self)

    def submit(self, values: Union[np.ndarray, np.generic, TensorArray],
               context: object=None) -> None:
        """
        Submit a prediction request

        It immediately returns without blocking the caller, and
        If the prediction is completed, the outputs will be sent to CompletionQueue.

        Args:
            values: Input values
            context: an additional context to identify the prediction request
        """
        _fill_tensors(values, self.inputs)
        # manually increase reference count to keep the context object while running
        increase_ref_count(context)
        err = LIBNUX.nux_async_session_run(self.ref, context, self.inputs)

        if is_err(err):
            raise into_exception(err)

    def close(self):
        """Closes this session

        After a session is closed, CompletionQueue will return an error
        if CompletionQueue.recv() is called.
        """
        if self.ref:
            LIBNUX.nux_async_session_destroy(self.ref)
            self.ref = None

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def __del__(self):
        self.close()

Ancestors

Class variables

var ref

Methods

def close(self)

Closes this session

After a session is closed, CompletionQueue will return an error if CompletionQueue.recv() is called.

Expand source code
def close(self):
    """Closes this session

    After a session is closed, CompletionQueue will return an error
    if CompletionQueue.recv() is called.
    """
    if self.ref:
        LIBNUX.nux_async_session_destroy(self.ref)
        self.ref = None
def submit(self, values: Union[numpy.ndarray, numpy.generic, TensorArray], context: object = None) ‑> NoneType

Submit a prediction request

It immediately returns without blocking the caller, and If the prediction is completed, the outputs will be sent to CompletionQueue.

Args

values
Input values
context
an additional context to identify the prediction request
Expand source code
def submit(self, values: Union[np.ndarray, np.generic, TensorArray],
           context: object=None) -> None:
    """
    Submit a prediction request

    It immediately returns without blocking the caller, and
    If the prediction is completed, the outputs will be sent to CompletionQueue.

    Args:
        values: Input values
        context: an additional context to identify the prediction request
    """
    _fill_tensors(values, self.inputs)
    # manually increase reference count to keep the context object while running
    increase_ref_count(context)
    err = LIBNUX.nux_async_session_run(self.ref, context, self.inputs)

    if is_err(err):
        raise into_exception(err)

Inherited members

class CompletionQueue (ref: ctypes.c_void_p, context_ty: type, output_descs: [TensorDesc'>])

Receives the completion results asynchronously from AsyncSession

Expand source code
class CompletionQueue:
    """Receives the completion results asynchronously from AsyncSession"""
    ref = c_void_p(None)
    context_ty: type
    output_descs: [TensorDesc]

    def __init__(self, ref: c_void_p, context_ty: type, output_descs: [TensorDesc]):
        self._as_parameter_ = ref
        self.ref = ref
        self.context_ty = context_ty
        self.output_descs = output_descs
        self.queue_ok = True

    def recv(self) -> (object, TensorArray):
        """Receives the prediction results asynchronously coming from AsyncSession

        If there are already prediction outputs, it will return immediately.
        Or it will be blocked until the next result are ready.

        Returns:
            A tuple, whose first value is the context value passed \
            when you submit an inference task and the second value \
            is inference output.
        """
        err = c_int32(0)
        context_ref = ctypes.py_object(None)
        outputs_ref = c_void_p(None)

        self.queue_ok = LIBNUX.nux_completion_queue_next(self.ref,
                                                         byref(context_ref),
                                                         byref(outputs_ref),
                                                         byref(err))
        context_val = context_ref.value
        decref(context_ref)

        if is_ok(err.value):
            return context_val, TensorArray(outputs_ref, self.output_descs, allocated=False)

        raise into_exception(err)

    def close(self):
        """Closes this completion queue.

        If it is closed, AsyncSession also will stop working.
        """
        if self.ref:
            LIBNUX.nux_completion_queue_destroy(self.ref)
            self.ref = None

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def __del__(self):
        self.close()

    def __iter__(self):
        return self

    def __next__(self):
        if self.queue_ok:
            return self.recv()

        raise StopIteration()

Class variables

var context_ty : type
var output_descs : [TensorDesc'>]
var ref

Methods

def close(self)

Closes this completion queue.

If it is closed, AsyncSession also will stop working.

Expand source code
def close(self):
    """Closes this completion queue.

    If it is closed, AsyncSession also will stop working.
    """
    if self.ref:
        LIBNUX.nux_completion_queue_destroy(self.ref)
        self.ref = None
def recv(self) ‑> (TensorArray'>)

Receives the prediction results asynchronously coming from AsyncSession

If there are already prediction outputs, it will return immediately. Or it will be blocked until the next result are ready.

Returns

A tuple, whose first value is the context value passed when you submit an inference task and the second value is inference output.

Expand source code
def recv(self) -> (object, TensorArray):
    """Receives the prediction results asynchronously coming from AsyncSession

    If there are already prediction outputs, it will return immediately.
    Or it will be blocked until the next result are ready.

    Returns:
        A tuple, whose first value is the context value passed \
        when you submit an inference task and the second value \
        is inference output.
    """
    err = c_int32(0)
    context_ref = ctypes.py_object(None)
    outputs_ref = c_void_p(None)

    self.queue_ok = LIBNUX.nux_completion_queue_next(self.ref,
                                                     byref(context_ref),
                                                     byref(outputs_ref),
                                                     byref(err))
    context_val = context_ref.value
    decref(context_ref)

    if is_ok(err.value):
        return context_val, TensorArray(outputs_ref, self.output_descs, allocated=False)

    raise into_exception(err)
class Session (model)

Provides a blocking API to run an inference task with a given model

Expand source code
class Session(Model):
    """Provides a blocking API to run an inference task with a given model"""
    ref = c_void_p(None)

    def __init__(self, model):
        sess = c_void_p(None)
        options: c_void_p = LIBNUX.nux_session_option_create()

        model_image = _model_image(model)

        err = LIBNUX.nux_session_create(model_image, len(model_image), options, byref(sess))
        if is_err(err):
            raise into_exception(err)

        self.ref = sess
        self._as_parameter_ = self.ref

        super().__init__()

    def _get_model_ref(self) -> c_void_p:
        return LIBNUX.nux_session_get_model(self)

    def run(self, inputs) -> TensorArray:
        """
        Runs an inference task with `inputs`

        Args:
            inputs: It can be a single nux.Tensor, nux.TensorArray or \
            numpy.ndarray object. Also, you can pass one TensorArray or a \
            list of numpy.ndarray objects.

        Returns:
            Inference output
        """
        _inputs = self.allocate_inputs()
        outputs = self.create_outputs()
        _inputs = _fill_tensors(inputs, _inputs)

        err = LIBNUX.nux_session_run(self.ref, _inputs, outputs)

        if is_err(err):
            raise into_exception(err)

        return outputs

    def close(self):
        """Close the session and release all resources belonging to the session"""
        if self.ref:
            LIBNUX.nux_session_destroy(self.ref)
            self.ref = None

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def __del__(self):
        self.close()

Ancestors

Class variables

var ref

Methods

def close(self)

Close the session and release all resources belonging to the session

Expand source code
def close(self):
    """Close the session and release all resources belonging to the session"""
    if self.ref:
        LIBNUX.nux_session_destroy(self.ref)
        self.ref = None
def run(self, inputs) ‑> TensorArray

Runs an inference task with inputs

Args

inputs
It can be a single nux.Tensor, nux.TensorArray or numpy.ndarray object. Also, you can pass one TensorArray or a list of numpy.ndarray objects.

Returns

Inference output

Expand source code
def run(self, inputs) -> TensorArray:
    """
    Runs an inference task with `inputs`

    Args:
        inputs: It can be a single nux.Tensor, nux.TensorArray or \
        numpy.ndarray object. Also, you can pass one TensorArray or a \
        list of numpy.ndarray objects.

    Returns:
        Inference output
    """
    _inputs = self.allocate_inputs()
    outputs = self.create_outputs()
    _inputs = _fill_tensors(inputs, _inputs)

    err = LIBNUX.nux_session_run(self.ref, _inputs, outputs)

    if is_err(err):
        raise into_exception(err)

    return outputs

Inherited members