ctrlX Data Layer API for Python  3.3.0
The ctrlX Data Layer API allows access to the ctrlX Data Layer with Python
subscription_sync.py
1 """
2 Class Sync Subscription
3 """
4 import ctypes
5 import typing
6 import weakref
7 
8 import ctrlxdatalayer
11 from ctrlxdatalayer.clib import userData_c_void_p
12 from ctrlxdatalayer.clib_client import (C_DLR_CLIENT_NOTIFY_RESPONSE,
13  C_NotifyItem)
14 from ctrlxdatalayer.variant import Result, Variant
15 
16 
18  """
19  SubscriptionSync
20  """
21  __slots__ = ['__ptr_notify', '__closed', '__client', '__id', '__mock']
22 
23  def __init__(self, client: ctrlxdatalayer.client.Client):
24  """
25  @param [in] client Reference to the client
26  """
28  self.__closed__closed = False
29  self.__client__client = weakref.ref(client)
30  self.__id__id = ""
31  self.__mock__mock = False
32 
33  def __enter__(self):
34  """
35  use the python context manager
36  """
37  return self
38 
39  def __exit__(self, exc_type, exc_val, exc_tb):
40  """
41  use the python context manager
42  """
43  self.closeclose()
44 
45  def on_close(self):
46  """
47  on_close
48  """
49  self.closeclose()
50 
51  def id(self) -> str:
52  """
53  Subscription ID
54 
55  @return <str> id
56  """
57  return self.__id__id
58 
59  def __create_sub_callback(self, cb: ctrlxdatalayer.subscription.ResponseNotifyCallback):
60  """
61  callback management
62  """
64  self.__ptr_notify__ptr_notify = cb_ptr
65 
66  def _cb(status: ctrlxdatalayer.clib.C_DLR_RESULT, items: ctypes.POINTER(C_NotifyItem),
67  count: ctypes.c_uint32, userdata: ctypes.c_void_p):
68  """
69  datalayer calls this function
70  """
71  r = Result(status)
72  if r == Result.OK:
73  notify_items = []
74  for x in range(0, count):
76  items[x].data, items[x].info)
77  if not self.__mock__mock and len(n.get_address()) == 0: #empty address???
78  continue
79  notify_items.append(n)
80  cb(r, notify_items, userdata)
81  del notify_items
82  return
83  cb(r, [], userdata)
84 
85  cb_ptr.set_ptr(C_DLR_CLIENT_NOTIFY_RESPONSE(_cb))
86  return cb_ptr.get_ptr()
87 
88  def _test_notify_callback(self, cb: ctrlxdatalayer.subscription.ResponseNotifyCallback):
89  """
90  internal use
91  """
92  self.__mock__mock = True
93  return self.__create_sub_callback__create_sub_callback(cb)
94 
95  def close(self):
96  """
97  closes the client instance
98  """
99  if self.__closed__closed:
100  return
101  self.__closed__closed = True
102  self.unsubscribe_allunsubscribe_all()
103  self.__ptr_notify__ptr_notify = None
104  self.__client__client = None
105 
106  def _create(self, prop: Variant, cnb: ctrlxdatalayer.subscription.ResponseNotifyCallback,
107  userdata: userData_c_void_p = None) -> Result:
108  """
109  Set up a subscription
110  @param[in] ruleset Variant that describe ruleset of subscription as subscription.fbs
111  @param[in] publishCallback Callback to call when new data is available
112  @param[in] userdata User data - will be returned in publishCallback as userdata. You can use this userdata to identify your subscription
113  @result <Result> status of function cal
114  """
115  r = Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientCreateSubscriptionSync(
116  self.__client__client().get_handle(), prop.get_handle(),
117  self.__create_sub_callback__create_sub_callback(cnb), userdata, self.__client__client().get_token()))
118  if r == Result.OK:
120  return r
121 
122  def subscribe(self, address: str) -> Result:
123  """
124  Adds a node to a subscription id
125  @param[in] address Address of a node, that should be added to the given subscription.
126  @result <Result> status of function call
127  """
128  b_id = self.ididid().encode('utf-8')
129  b_address = address.encode('utf-8')
130  return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientSubscribeSync(
131  self.__client__client().get_handle(), b_id, b_address))
132 
133  def unsubscribe(self, address: str) -> Result:
134  """
135  Removes a node from a subscription id
136  @param[in] address Address of a node, that should be removed to the given subscription.
137  @result <Result> status of function call
138  """
139  b_id = self.ididid().encode('utf-8')
140  b_address = address.encode('utf-8')
141  return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientUnsubscribeSync(
142  self.__client__client().get_handle(), b_id, b_address))
143 
144  def subscribe_multi(self, address: typing.List[str]) -> Result:
145  """
146  Adds a list of nodes to a subscription id
147  @param[in] address List of Addresses of a node, that should be added to the given subscription.
148  @param[in] count Count of addresses.
149  @result <Result> status of function call
150  """
151  b_id = self.ididid().encode('utf-8')
152  b_address = (ctypes.c_char_p * len(address))(*
153  [d.encode('utf-8') for d in address])
154  return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientSubscribeMultiSync(
155  self.__client__client().get_handle(), b_id, b_address, len(address)))
156 
157  def unsubscribe_multi(self, address: typing.List[str]) -> Result:
158  """
159  Removes a set of nodes from a subscription id
160  @param[in] address Set of addresses of nodes, that should be removed to the given subscription.
161  @result <Result> status of function call
162  """
163  b_id = self.ididid().encode('utf-8')
164  b_address = (ctypes.c_char_p * len(address))(*
165  [d.encode('utf-8') for d in address])
166  return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientUnsubscribeMultiSync(
167  self.__client__client().get_handle(), b_id, b_address, len(address)))
168 
169  def unsubscribe_all(self) -> Result:
170  """
171  Removes subscription id completely
172  @result <Result> status of function call
173  """
174  if self.__client__client is None:
175  return
176  self.__client__client()._unregister_sync(self)
177  b_id = self.ididid().encode('utf-8')
178  return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientUnsubscribeAllSync(
179  self.__client__client().get_handle(), b_id))
Client interface for accessing data from the system.
Definition: client.py:61
Result unsubscribe(self, str address)
Removes a node from a subscription id.
def __exit__(self, exc_type, exc_val, exc_tb)
use the python context manager
Result unsubscribe_all(self)
Removes subscription id completely.
def __enter__(self)
use the python context manager
Result unsubscribe_multi(self, typing.List[str] address)
Removes a set of nodes from a subscription id.
Result subscribe_multi(self, typing.List[str] address)
Adds a list of nodes to a subscription id.
def __init__(self, ctrlxdatalayer.client.Client client)
def close(self)
closes the client instance
def __create_sub_callback(self, ctrlxdatalayer.subscription.ResponseNotifyCallback cb)
Result subscribe(self, str address)
Adds a node to a subscription id.
str get_id(Variant prop)