8 import comm.datalayer.SubscriptionProperties
10 from comm.datalayer
import NotifyInfo
13 import ctrlxdatalayer.clib_provider_node
14 from ctrlxdatalayer.clib_provider_node
import C_DLR_SUBSCRIPTION
18 def _subscription_get_timestamp(sub: C_DLR_SUBSCRIPTION) -> ctypes.c_uint64:
19 """_subscription_get_timestamp
22 sub (C_DLR_SUBSCRIPTION): Reference to the subscription
27 return ctrlxdatalayer.clib.libcomm_datalayer.DLR_SubscriptionGetTimestamp(sub)
30 def _subscription_get_props(sub: C_DLR_SUBSCRIPTION) -> VariantRef:
31 """_subscription_get_props
34 sub (C_DLR_SUBSCRIPTION): Reference to the subscription
37 VariantRef: Properties of Subscription in a Variant (see sub_properties.fbs)
39 return VariantRef(ctrlxdatalayer.clib.libcomm_datalayer.DLR_SubscriptionGetProps(sub))
42 def _subscription_get_nodes(sub: C_DLR_SUBSCRIPTION) -> Variant:
43 """_subscription_get_nodes
46 sub (C_DLR_SUBSCRIPTION): Reference to the subscription
49 Variant: Subscribed nodes as array of strings
52 ctrlxdatalayer.clib.libcomm_datalayer.DLR_SubscriptionGetNodes(
69 containing notify_info.fbs (address, timestamp, type, ...)
72 __slots__ = [
'__node',
'__timestamp',
'__notify_type', \
73 '__event_type',
'__sequence_number',
'__source_name']
75 def __init__(self, node_address: str):
81 self.
__node__node = node_address
82 self.
__timestamp__timestamp = datetime.datetime.now()
100 dt (datetime.datetime):
117 nt (NotifyTypePublish):
132 In case of an event, this string contains the information
133 what EventType has been fired.
134 E.g.: "types/events/ExampleEvent"
150 """set_sequence_number
151 sequence number of an event
159 """get_sequence_number
168 description of the source of an event
186 class NotifyItemPublish:
189 __slots__ = [
'__data',
'__info',
'__notify_info']
191 def __init__(self, node_address: str):
201 use the python context manager
205 def __exit__(self, exc_type, exc_val, exc_tb):
207 use the python context manager
228 Variant: data of the notify item
244 Variant: containing notify_info.fbs (address, timestamp, type, ...)
246 builder = flatbuffers.Builder(1024)
248 ni = NotifyInfo.NotifyInfoT()
250 ni.timestamp = Variant.to_filetime(self.
__notify_info__notify_info.get_timestamp())
252 ni.eventType = self.
__notify_info__notify_info.get_event_type()
253 ni.sequenceNumber = self.
__notify_info__notify_info.get_sequence_number()
254 ni.sourceName = self.
__notify_info__notify_info.get_source_name()
256 ni_int = ni.Pack(builder)
257 builder.Finish(ni_int)
259 self.
__info__info.set_flatbuffers(builder.Output())
263 def _subscription_publish(sub: C_DLR_SUBSCRIPTION, status: Result, items: typing.List[NotifyItemPublish]) -> Result:
264 """_subscription_publish
267 sub (C_DLR_SUBSCRIPTION): Reference to the subscription
268 status (Result): Status of notification. On failure subscription is canceled for all items.
269 items (typing.List[NotifyItemPublish]): Notification items
274 elems = (ctrlxdatalayer.clib_provider_node.C_DLR_NOTIFY_ITEM * len(items))()
276 for i, item
in enumerate(items):
277 elems[i].data = item.get_data().get_handle()
278 elems[i].info = item.get_info().get_handle()
280 notify_items = ctypes.cast(elems, ctypes.POINTER(
281 ctrlxdatalayer.clib_provider_node.C_DLR_NOTIFY_ITEM))
282 len_item = ctypes.c_size_t(len(items))
283 return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_SubscriptionPublish(sub, status.value, notify_items, len_item))
288 ProviderSubscription helper class
290 __slots__ = [
'__subscription',
'__id']
292 def __init__(self, sub: C_DLR_SUBSCRIPTION):
294 init ProviderSubscription
296 self.__subscription = sub
305 if self.
__id__id
is None:
306 self.
__id__id = self.
get_propsget_props().Id().decode(
'utf-8')
310 def get_props(self) -> comm.datalayer.SubscriptionProperties:
314 comm.datalayer.SubscriptionProperties: subscription properties
317 return comm.datalayer.SubscriptionProperties.SubscriptionProperties.GetRootAsSubscriptionProperties(v.get_flatbuffers(), 0)
323 datetime.datetime: timestamp
325 val = _subscription_get_timestamp(self.
__subscription__subscription)
326 return Variant.from_filetime(val)
332 typing.List[str]: Subscribed nodes as array of strings
336 return val.get_array_string()
338 def publish(self, status: Result, items: typing.List[NotifyItemPublish]) -> Result:
342 status (Result): Status of notification. On failure subscription is canceled for all items.
343 items (typing.List[NotifyItemPublish]): Notification items
345 return _subscription_publish(self.
__subscription__subscription, status, items)
def __init__(self, str node_address)
init
def set_event_type(self, str et)
set_event_type In case of an event, this string contains the information what EventType has been fire...
def set_notify_type(self, NotifyTypePublish nt)
set_notify_type
str get_source_name(self)
get_source_name
def get_timestamp(self)
get_timestamp
str get_event_type(self)
get_event_type
int get_sequence_number(self)
get_sequence_number
NotifyTypePublish get_notify_type(self)
get_notify_type
def set_timestamp(self, datetime.datetime dt)
set_timestamp
def set_sequence_number(self, int sn)
set_sequence_number sequence number of an event
def set_source_name(self, str source)
set_source_name description of the source of an event
str get_node(self)
get_node
NotifyInfoPublish get_notify_info(self)
get_notify_info
def __exit__(self, exc_type, exc_val, exc_tb)
use the python context manager
def __init__(self, str node_address)
init
def __enter__(self)
use the python context manager
Variant get_info(self)
internal use
def close(self)
closes the instance
Variant get_data(self)
get_data
ProviderSubscription helper class.
typing.List[str] get_notes(self)
get_notes
datetime.datetime get_timestamp(self)
timestamp
Result publish(self, Result status, typing.List[NotifyItemPublish] items)
publish
comm.datalayer.SubscriptionProperties get_props(self)
get_props
Variant is a container for a many types of data.