ctrlX Data Layer API for Python  3.3.0
The ctrlX Data Layer API allows access to the ctrlX Data Layer with Python
provider_subscription.py
1 
2 
3 import ctypes
4 import datetime
5 from enum import Enum
6 import typing
7 
8 import comm.datalayer.SubscriptionProperties
9 import flatbuffers
10 from comm.datalayer import NotifyInfo
11 
12 import ctrlxdatalayer
13 import ctrlxdatalayer.clib_provider_node
14 from ctrlxdatalayer.clib_provider_node import C_DLR_SUBSCRIPTION
15 from ctrlxdatalayer.variant import Result, Variant, VariantRef
16 
17 
18 def _subscription_get_timestamp(sub: C_DLR_SUBSCRIPTION) -> ctypes.c_uint64:
19  """_subscription_get_timestamp
20 
21  Args:
22  sub (C_DLR_SUBSCRIPTION): Reference to the subscription
23 
24  Returns:
25  c_uint64: timestamp
26  """
27  return ctrlxdatalayer.clib.libcomm_datalayer.DLR_SubscriptionGetTimestamp(sub)
28 
29 
30 def _subscription_get_props(sub: C_DLR_SUBSCRIPTION) -> VariantRef:
31  """_subscription_get_props
32 
33  Args:
34  sub (C_DLR_SUBSCRIPTION): Reference to the subscription
35 
36  Returns:
37  VariantRef: Properties of Subscription in a Variant (see sub_properties.fbs)
38  """
39  return VariantRef(ctrlxdatalayer.clib.libcomm_datalayer.DLR_SubscriptionGetProps(sub))
40 
41 
42 def _subscription_get_nodes(sub: C_DLR_SUBSCRIPTION) -> Variant:
43  """_subscription_get_nodes
44 
45  Args:
46  sub (C_DLR_SUBSCRIPTION): Reference to the subscription
47 
48  Returns:
49  Variant: Subscribed nodes as array of strings
50  """
51  v = Variant()
52  ctrlxdatalayer.clib.libcomm_datalayer.DLR_SubscriptionGetNodes(
53  sub, v.get_handle())
54  return v
55 
56 class NotifyTypePublish(Enum):
57  """NotifyTypePublish
58  """
59  DATA = 0
60  BROWSE = 1
61  METADATA = 2
62  KEEPALIVE = 3
63  TYPE = 4
64  EVENT = 5
65 
66 class NotifyInfoPublish:
67  """NotifyInfoPublish
68 
69  containing notify_info.fbs (address, timestamp, type, ...)
70  """
71 
72  __slots__ = ['__node', '__timestamp', '__notify_type', \
73  '__event_type', '__sequence_number', '__source_name']
74 
75  def __init__(self, node_address: str):
76  """__init__
77 
78  Args:
79  node_address (str):
80  """
81  self.__node__node = node_address
82  self.__timestamp__timestamp = datetime.datetime.now()
83  self.__notify_type__notify_type = NotifyTypePublish.DATA
84  self.__event_type__event_type = None
85  self.__sequence_number__sequence_number = 0
86  self.__source_name__source_name = None
87 
88  def get_node(self) -> str:
89  """get_node
90 
91  Returns:
92  str: node address
93  """
94  return self.__node__node
95 
96  def set_timestamp(self, dt: datetime.datetime):
97  """set_timestamp
98 
99  Args:
100  dt (datetime.datetime):
101  """
102  self.__timestamp__timestamp = dt
103  return self
104 
105  def get_timestamp(self):
106  """get_timestamp
107 
108  Returns:
109  datetime.datetime:
110  """
111  return self.__timestamp__timestamp
112 
113  def set_notify_type(self, nt: NotifyTypePublish):
114  """set_notify_type
115 
116  Args:
117  nt (NotifyTypePublish):
118  """
119  self.__notify_type__notify_type = nt
120  return self
121 
122  def get_notify_type(self) -> NotifyTypePublish:
123  """get_notify_type
124 
125  Returns:
126  NotifyTypePublish:
127  """
128  return self.__notify_type__notify_type
129 
130  def set_event_type(self, et: str):
131  """set_event_type
132  In case of an event, this string contains the information
133  what EventType has been fired.
134  E.g.: "types/events/ExampleEvent"
135  Args:
136  et (str):
137  """
138  self.__event_type__event_type = et
139  return self
140 
141  def get_event_type(self) -> str:
142  """get_event_type
143 
144  Returns:
145  str:
146  """
147  return self.__event_type__event_type
148 
149  def set_sequence_number(self, sn: int):
150  """set_sequence_number
151  sequence number of an event
152  Args:
153  sn (int): 0 default
154  """
155  self.__sequence_number__sequence_number = sn
156  return self
157 
158  def get_sequence_number(self) -> int:
159  """get_sequence_number
160 
161  Returns:
162  int:
163  """
164  return self.__sequence_number__sequence_number
165 
166  def set_source_name(self, source: str):
167  """set_source_name
168  description of the source of an event
169 
170  Args:
171  source (str):
172  """
173  self.__source_name__source_name = source
174  return self
175 
176  def get_source_name(self) -> str:
177  """get_source_name
178 
179  Returns:
180  str:
181  """
182  return self.__source_name__source_name
183 
184 class NotifyItemPublish:
185  """
186  class NotifyItemPublish:
187 
188  """
189  __slots__ = ['__data', '__info', '__notify_info']
190 
191  def __init__(self, node_address: str):
192  """
193  __init__
194  """
195  self.__data__data = Variant()
196  self.__info__info = Variant()
197  self.__notify_info__notify_info = NotifyInfoPublish(node_address)
198 
199  def __enter__(self):
200  """
201  use the python context manager
202  """
203  return self
204 
205  def __exit__(self, exc_type, exc_val, exc_tb):
206  """
207  use the python context manager
208  """
209  self.closeclose()
210 
211  def __del__(self):
212  """
213  __del__
214  """
215  self.closeclose()
216 
217  def close(self):
218  """
219  closes the instance
220  """
221  self.__data__data.close()
222  self.__info__info.close()
223 
224  def get_data(self) -> Variant:
225  """get_data
226 
227  Returns:
228  Variant: data of the notify item
229  """
230  return self.__data__data
231 
232  def get_notify_info(self) -> NotifyInfoPublish:
233  """get_notify_info
234 
235  Returns:
236  get_notify_info:
237  """
238  return self.__notify_info__notify_info
239 
240  def get_info(self) -> Variant:
241  """internal use
242 
243  Returns:
244  Variant: containing notify_info.fbs (address, timestamp, type, ...)
245  """
246  builder = flatbuffers.Builder(1024)
247 
248  ni = NotifyInfo.NotifyInfoT()
249  ni.node = self.__notify_info__notify_info.get_node()
250  ni.timestamp = Variant.to_filetime(self.__notify_info__notify_info.get_timestamp())
251  ni.notifyType = self.__notify_info__notify_info.get_notify_type().value
252  ni.eventType = self.__notify_info__notify_info.get_event_type()
253  ni.sequenceNumber = self.__notify_info__notify_info.get_sequence_number()
254  ni.sourceName = self.__notify_info__notify_info.get_source_name()
255 
256  ni_int = ni.Pack(builder)
257  builder.Finish(ni_int)
258 
259  self.__info__info.set_flatbuffers(builder.Output())
260  return self.__info__info
261 
262 
263 def _subscription_publish(sub: C_DLR_SUBSCRIPTION, status: Result, items: typing.List[NotifyItemPublish]) -> Result:
264  """_subscription_publish
265 
266  Args:
267  sub (C_DLR_SUBSCRIPTION): Reference to the subscription
268  status (Result): Status of notification. On failure subscription is canceled for all items.
269  items (typing.List[NotifyItemPublish]): Notification items
270 
271  Returns:
272  Result:
273  """
274  elems = (ctrlxdatalayer.clib_provider_node.C_DLR_NOTIFY_ITEM * len(items))()
275  #for i in range(len(items)):
276  for i, item in enumerate(items):
277  elems[i].data = item.get_data().get_handle()
278  elems[i].info = item.get_info().get_handle()
279 
280  notify_items = ctypes.cast(elems, ctypes.POINTER(
281  ctrlxdatalayer.clib_provider_node.C_DLR_NOTIFY_ITEM))
282  len_item = ctypes.c_size_t(len(items))
283  return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_SubscriptionPublish(sub, status.value, notify_items, len_item))
284 
285 
287  """
288  ProviderSubscription helper class
289  """
290  __slots__ = ['__subscription', '__id']
291 
292  def __init__(self, sub: C_DLR_SUBSCRIPTION):
293  """
294  init ProviderSubscription
295  """
296  self.__subscription = sub
297  self.__id__id = None
298 
299  def get_id(self) -> str:
300  """get_id
301 
302  Returns:
303  str: subscription id
304  """
305  if self.__id__id is None:
306  self.__id__id = self.get_propsget_props().Id().decode('utf-8')
307 
308  return self.__id__id
309 
310  def get_props(self) -> comm.datalayer.SubscriptionProperties:
311  """get_props
312 
313  Returns:
314  comm.datalayer.SubscriptionProperties: subscription properties
315  """
316  v = _subscription_get_props(self.__subscription__subscription)
317  return comm.datalayer.SubscriptionProperties.SubscriptionProperties.GetRootAsSubscriptionProperties(v.get_flatbuffers(), 0)
318 
319  def get_timestamp(self) -> datetime.datetime:
320  """timestamp
321 
322  Returns:
323  datetime.datetime: timestamp
324  """
325  val = _subscription_get_timestamp(self.__subscription__subscription)
326  return Variant.from_filetime(val)
327 
328  def get_notes(self) -> typing.List[str]:
329  """get_notes
330 
331  Returns:
332  typing.List[str]: Subscribed nodes as array of strings
333  """
334  val = _subscription_get_nodes(self.__subscription__subscription)
335  with val:
336  return val.get_array_string()
337 
338  def publish(self, status: Result, items: typing.List[NotifyItemPublish]) -> Result:
339  """publish
340 
341  Args:
342  status (Result): Status of notification. On failure subscription is canceled for all items.
343  items (typing.List[NotifyItemPublish]): Notification items
344  """
345  return _subscription_publish(self.__subscription__subscription, status, items)
def set_event_type(self, str et)
set_event_type In case of an event, this string contains the information what EventType has been fire...
def set_notify_type(self, NotifyTypePublish nt)
set_notify_type
NotifyTypePublish get_notify_type(self)
get_notify_type
def set_timestamp(self, datetime.datetime dt)
set_timestamp
def set_sequence_number(self, int sn)
set_sequence_number sequence number of an event
def set_source_name(self, str source)
set_source_name description of the source of an event
NotifyInfoPublish get_notify_info(self)
get_notify_info
def __exit__(self, exc_type, exc_val, exc_tb)
use the python context manager
Result publish(self, Result status, typing.List[NotifyItemPublish] items)
publish
comm.datalayer.SubscriptionProperties get_props(self)
get_props
Variant is a container for a many types of data.
Definition: variant.py:150