ctrlX Data Layer API for Python  3.3.0
The ctrlX Data Layer API allows access to the ctrlX Data Layer with Python
subscription_async.py
1 """
2 Class Async Subscription
3 """
4 import ctypes
5 import time
6 import typing
7 import weakref
8 
9 import ctrlxdatalayer
11 from ctrlxdatalayer.clib import C_DLR_RESULT, userData_c_void_p
12 from ctrlxdatalayer.clib_client import (C_DLR_CLIENT_NOTIFY_RESPONSE,
13  C_DLR_CLIENT_RESPONSE, C_NotifyItem)
14 from ctrlxdatalayer.clib_variant import C_DLR_VARIANT
15 from ctrlxdatalayer.variant import Result, Variant, VariantRef
16 
17 
19  """
20  SubscriptionAsync
21  """
22  __slots__ = ['__ptr_notify', '__ptr_resp',
23  '__closed', '__client', '__id', '__on_cb']
24 
25  def __init__(self, client: ctrlxdatalayer.client.Client):
26  """
27  @param[in] client Reference to the client
28  """
31  self.__closed__closed = False
32  self.__client__client = weakref.ref(client)
33  self.__id__id = ""
34  self.__on_cb__on_cb = False
35 
36  def __enter__(self):
37  """
38  use the python context manager
39  """
40  return self
41 
42  def __exit__(self, exc_type, exc_val, exc_tb):
43  """
44  use the python context manager
45  """
46  self.closeclose()
47 
48  def on_close(self):
49  """ on_close """
50  self.closeclose()
51 
52  def id(self) -> str:
53  """
54  Subscription ID
55  """
56  return self.__id__id
57 
58  def close(self):
59  """
60  closes the client instance
61  """
62  if self.__closed__closed:
63  return
64  self.__closed__closed = True
65  self.__close_sub__close_sub()
66  self.__ptr_notify__ptr_notify = None
67  self.__ptr_resp__ptr_resp = None
68  self.__client__client = None
69 
70  def __close_sub(self):
71  print("close_sub:", self.__id__id)
72 
73  if self.__id__id is None or self.__id__id == "":
74  return
75 
76  def __cb_close(result: Result, data: typing.Optional[Variant], userdata: ctrlxdatalayer.clib.userData_c_void_p):
77  print("async close all: ", result, int(userdata))
78 
79  self.unsubscribe_allunsubscribe_all(__cb_close, 1879)
80  self.wait_on_response_cbwait_on_response_cb()
81 
82  def __create_sub_notify_callback(self, cb: ctrlxdatalayer.subscription.ResponseNotifyCallback):
83  """
84  callback management
85  """
87  self.__ptr_notify__ptr_notify = cb_ptr
88 
89  def _cb(status: C_DLR_RESULT, items: ctypes.POINTER(C_NotifyItem), count: ctypes.c_uint32, userdata: ctypes.c_void_p):
90  """
91  datalayer calls this function
92  """
93  r = Result(status)
94  if r == Result.OK:
95  notify_items = []
96  for x in range(0, count):
98  items[x].data, items[x].info)
99  notify_items.append(n)
100  cb(r, notify_items, userdata)
101  del notify_items
102  return
103  cb(r, [], userdata)
104 
105  cb_ptr.set_ptr(C_DLR_CLIENT_NOTIFY_RESPONSE(_cb))
106  return cb_ptr.get_ptr()
107 
108  def _test_notify_callback(self, cb: ctrlxdatalayer.subscription.ResponseNotifyCallback):
109  """
110  internal use
111  """
112  return self.__create_sub_notify_callback__create_sub_notify_callback(cb)
113 
114  def __create_response_callback(self, cb: ctrlxdatalayer.client.ResponseCallback):
115  """
116  callback management
117  """
118  self.__on_cb__on_cb = False
120  self.__ptr_resp__ptr_resp = cb_ptr
121 
122  def _cb(status: C_DLR_RESULT, data: C_DLR_VARIANT, userdata: ctypes.c_void_p):
123  """
124  datalayer calls this function
125  """
126  r = Result(status)
127  if r == Result.OK:
128  v = VariantRef(data)
129  cb(r, v, userdata)
130  self.__on_cb__on_cb = True
131  return
132  cb(r, None, userdata)
133  self.__on_cb__on_cb = True
134 
135  cb_ptr.set_ptr(C_DLR_CLIENT_RESPONSE(_cb))
136  return cb_ptr.get_ptr()
137 
138  def _test_response_callback(self, cb: ctrlxdatalayer.client.ResponseCallback):
139  """
140  internal use
141  """
142  return self.__create_response_callback__create_response_callback(cb)
143 
144  def _create(self, prop: Variant, cnb: ctrlxdatalayer.subscription.ResponseNotifyCallback, cb: ctrlxdatalayer.client.ResponseCallback, userdata: userData_c_void_p = None) -> Result:
145  """
146  Set up a subscription
147  @param[in] ruleset Variant that describe ruleset of subscription as subscription.fbs
148  @param[in] publishCallback Callback to call when new data is available
149  @param[in] callback Callback to be called when subscription is created
150  @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request and subscription
151  @param[in] token Security access &token for authentication as JWT payload
152  @result <Result> status of function call
153  """
154  r = Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientCreateSubscriptionAsync(
155  self.__client__client().get_handle(),
156  prop.get_handle(),
157  self.__create_sub_notify_callback__create_sub_notify_callback(cnb),
158  self.__create_response_callback__create_response_callback(cb),
159  userdata,
160  self.__client__client().get_token()))
161  if r == Result.OK:
163  return r
164 
165  def subscribe(self, address: str, cb: ctrlxdatalayer.client.ResponseCallback, userdata: userData_c_void_p = None) -> Result:
166  """
167  Set up a subscription to a node
168  @param[in] address Address of the node to add a subscription to
169  @param[in] callback Callback to called when data is subscribed
170  @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
171  @result <Result> status of function call
172  """
173  b_id = self.ididid().encode('utf-8')
174  b_address = address.encode('utf-8')
175  return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientSubscribeAsync(
176  self.__client__client().get_handle(),
177  b_id,
178  b_address,
179  self.__create_response_callback__create_response_callback(cb),
180  userdata))
181 
182  def unsubscribe(self, address: str, cb: ctrlxdatalayer.client.ResponseCallback, userdata: userData_c_void_p = None) -> Result:
183  """
184  Removes a node from a subscription id
185  @param[in] address Address of a node, that should be removed to the given subscription.
186  @param[in] callback Callback to called when data is subscribed
187  @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
188  @result <Result> status of function call
189  """
190  b_id = self.ididid().encode('utf-8')
191  b_address = address.encode('utf-8')
192  return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientUnsubscribeAsync(
193  self.__client__client().get_handle(),
194  b_id,
195  b_address,
196  self.__create_response_callback__create_response_callback(cb),
197  userdata))
198 
199  def subscribe_multi(self, address: typing.List[str], cb: ctrlxdatalayer.client.ResponseCallback, userdata: userData_c_void_p = None) -> Result:
200  """
201  Set up a subscription to multiple nodes
202  @param[in] address Set of addresses of nodes, that should be removed to the given subscription.
203  @param[in] count Count of addresses.
204  @param[in] callback Callback to called when data is subscribed
205  @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
206  @result <Result> status of function call
207  """
208  b_id = self.ididid().encode('utf-8')
209  b_address = (ctypes.c_char_p * len(address))(*
210  [d.encode('utf-8') for d in address])
211  return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientSubscribeMultiAsync(
212  self.__client__client().get_handle(),
213  b_id,
214  b_address,
215  len(address),
216  self.__create_response_callback__create_response_callback(cb),
217  userdata))
218 
219  def unsubscribe_multi(self, address: typing.List[str], cb: ctrlxdatalayer.client.ResponseCallback, userdata: userData_c_void_p = None) -> Result:
220  """
221  Removes a set of nodes from a subscription id
222  @param[in] address Address of a node, that should be removed to the given subscription.
223  @param[in] callback Callback to called when data is subscribed
224  @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
225  @result <Result> status of function call
226  """
227  b_id = self.ididid().encode('utf-8')
228  b_address = (ctypes.c_char_p * len(address))(*
229  [d.encode('utf-8') for d in address])
230  return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientUnsubscribeMultiAsync(
231  self.__client__client().get_handle(),
232  b_id,
233  b_address,
234  len(address),
235  self.__create_response_callback__create_response_callback(cb),
236  userdata))
237 
238  def unsubscribe_all(self, cb: ctrlxdatalayer.client.ResponseCallback, userdata: userData_c_void_p = None) -> Result:
239  """
240  Removes all subscriptions from a subscription id
241  @param[in] callback Callback to called when data is subscribed
242  @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
243  @result <Result> status of function call
244  """
245  if self.__client__client is None:
246  return
247  self.__client__client()._unregister_sync(self)
248  b_id = self.ididid().encode('utf-8')
249  return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientUnsubscribeAllAsync(
250  self.__client__client().get_handle(),
251  b_id,
252  self.__create_response_callback__create_response_callback(cb),
253  userdata))
254 
255  def wait_on_response_cb(self, wait: int = 5) -> bool:
256  """ wait_on_response_cb """
257  if wait <= 0:
258  wait = 5
259  n = 0
260  while not self.__on_cb__on_cb and n < wait:
261  n = n + 1
262  time.sleep(1)
263  return self.__on_cb__on_cb
Client interface for accessing data from the system.
Definition: client.py:61
Result unsubscribe_multi(self, typing.List[str] address, ctrlxdatalayer.client.ResponseCallback cb, userData_c_void_p userdata=None)
Removes a set of nodes from a subscription id.
bool wait_on_response_cb(self, int wait=5)
wait_on_response_cb
def __exit__(self, exc_type, exc_val, exc_tb)
use the python context manager
Result subscribe_multi(self, typing.List[str] address, ctrlxdatalayer.client.ResponseCallback cb, userData_c_void_p userdata=None)
Set up a subscription to multiple nodes.
Result unsubscribe(self, str address, ctrlxdatalayer.client.ResponseCallback cb, userData_c_void_p userdata=None)
Removes a node from a subscription id.
def __enter__(self)
use the python context manager
def __create_response_callback(self, ctrlxdatalayer.client.ResponseCallback cb)
def __init__(self, ctrlxdatalayer.client.Client client)
Result subscribe(self, str address, ctrlxdatalayer.client.ResponseCallback cb, userData_c_void_p userdata=None)
Set up a subscription to a node.
def __create_sub_notify_callback(self, ctrlxdatalayer.subscription.ResponseNotifyCallback cb)
Result unsubscribe_all(self, ctrlxdatalayer.client.ResponseCallback cb, userData_c_void_p userdata=None)
Removes all subscriptions from a subscription id.
str get_id(Variant prop)