2 Class Async Subscription
12 from ctrlxdatalayer.clib_client
import (C_DLR_CLIENT_NOTIFY_RESPONSE,
13 C_DLR_CLIENT_RESPONSE, C_NotifyItem)
14 from ctrlxdatalayer.clib_variant
import C_DLR_VARIANT
22 __slots__ = [
'__ptr_notify',
'__ptr_resp',
23 '__closed',
'__client',
'__id',
'__on_cb']
27 @param[in] client Reference to the client
32 self.
__client__client = weakref.ref(client)
38 use the python context manager
42 def __exit__(self, exc_type, exc_val, exc_tb):
44 use the python context manager
60 closes the client instance
70 def __close_sub(self):
71 print(
"close_sub:", self.
__id__id)
73 if self.
__id__id
is None or self.
__id__id ==
"":
76 def __cb_close(result: Result, data: typing.Optional[Variant], userdata: ctrlxdatalayer.clib.userData_c_void_p):
77 print(
"async close all: ", result, int(userdata))
82 def __create_sub_notify_callback(self, cb: ctrlxdatalayer.subscription.ResponseNotifyCallback):
89 def _cb(status: C_DLR_RESULT, items: ctypes.POINTER(C_NotifyItem), count: ctypes.c_uint32, userdata: ctypes.c_void_p):
91 datalayer calls this function
96 for x
in range(0, count):
98 items[x].data, items[x].info)
99 notify_items.append(n)
100 cb(r, notify_items, userdata)
105 cb_ptr.set_ptr(C_DLR_CLIENT_NOTIFY_RESPONSE(_cb))
106 return cb_ptr.get_ptr()
108 def _test_notify_callback(self, cb: ctrlxdatalayer.subscription.ResponseNotifyCallback):
114 def __create_response_callback(self, cb: ctrlxdatalayer.client.ResponseCallback):
122 def _cb(status: C_DLR_RESULT, data: C_DLR_VARIANT, userdata: ctypes.c_void_p):
124 datalayer calls this function
132 cb(r,
None, userdata)
135 cb_ptr.set_ptr(C_DLR_CLIENT_RESPONSE(_cb))
136 return cb_ptr.get_ptr()
138 def _test_response_callback(self, cb: ctrlxdatalayer.client.ResponseCallback):
144 def _create(self, prop: Variant, cnb: ctrlxdatalayer.subscription.ResponseNotifyCallback, cb: ctrlxdatalayer.client.ResponseCallback, userdata: userData_c_void_p =
None) -> Result:
146 Set up a subscription
147 @param[in] ruleset Variant that describe ruleset of subscription as subscription.fbs
148 @param[in] publishCallback Callback to call when new data is available
149 @param[in] callback Callback to be called when subscription is created
150 @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request and subscription
151 @param[in] token Security access &token for authentication as JWT payload
152 @result <Result> status of function call
154 r =
Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientCreateSubscriptionAsync(
155 self.
__client__client().get_handle(),
160 self.
__client__client().get_token()))
165 def subscribe(self, address: str, cb: ctrlxdatalayer.client.ResponseCallback, userdata: userData_c_void_p =
None) -> Result:
167 Set up a subscription to a node
168 @param[in] address Address of the node to add a subscription to
169 @param[in] callback Callback to called when data is subscribed
170 @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
171 @result <Result> status of function call
173 b_id = self.
ididid().encode(
'utf-8')
174 b_address = address.encode(
'utf-8')
175 return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientSubscribeAsync(
176 self.
__client__client().get_handle(),
182 def unsubscribe(self, address: str, cb: ctrlxdatalayer.client.ResponseCallback, userdata: userData_c_void_p =
None) -> Result:
184 Removes a node from a subscription id
185 @param[in] address Address of a node, that should be removed to the given subscription.
186 @param[in] callback Callback to called when data is subscribed
187 @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
188 @result <Result> status of function call
190 b_id = self.
ididid().encode(
'utf-8')
191 b_address = address.encode(
'utf-8')
192 return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientUnsubscribeAsync(
193 self.
__client__client().get_handle(),
199 def subscribe_multi(self, address: typing.List[str], cb: ctrlxdatalayer.client.ResponseCallback, userdata: userData_c_void_p =
None) -> Result:
201 Set up a subscription to multiple nodes
202 @param[in] address Set of addresses of nodes, that should be removed to the given subscription.
203 @param[in] count Count of addresses.
204 @param[in] callback Callback to called when data is subscribed
205 @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
206 @result <Result> status of function call
208 b_id = self.
ididid().encode(
'utf-8')
209 b_address = (ctypes.c_char_p * len(address))(*
210 [d.encode(
'utf-8')
for d
in address])
211 return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientSubscribeMultiAsync(
212 self.
__client__client().get_handle(),
219 def unsubscribe_multi(self, address: typing.List[str], cb: ctrlxdatalayer.client.ResponseCallback, userdata: userData_c_void_p =
None) -> Result:
221 Removes a set of nodes from a subscription id
222 @param[in] address Address of a node, that should be removed to the given subscription.
223 @param[in] callback Callback to called when data is subscribed
224 @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
225 @result <Result> status of function call
227 b_id = self.
ididid().encode(
'utf-8')
228 b_address = (ctypes.c_char_p * len(address))(*
229 [d.encode(
'utf-8')
for d
in address])
230 return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientUnsubscribeMultiAsync(
231 self.
__client__client().get_handle(),
238 def unsubscribe_all(self, cb: ctrlxdatalayer.client.ResponseCallback, userdata: userData_c_void_p =
None) -> Result:
240 Removes all subscriptions from a subscription id
241 @param[in] callback Callback to called when data is subscribed
242 @param[in] userdata User data - will be returned in callback as userdata. You can use this userdata to identify your request
243 @result <Result> status of function call
247 self.
__client__client()._unregister_sync(self)
248 b_id = self.
ididid().encode(
'utf-8')
249 return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_clientUnsubscribeAllAsync(
250 self.
__client__client().get_handle(),
256 """ wait_on_response_cb """
260 while not self.
__on_cb__on_cb
and n < wait:
Client interface for accessing data from the system.
str id(self)
Subscription ID.
Result unsubscribe_multi(self, typing.List[str] address, ctrlxdatalayer.client.ResponseCallback cb, userData_c_void_p userdata=None)
Removes a set of nodes from a subscription id.
bool wait_on_response_cb(self, int wait=5)
wait_on_response_cb
def __exit__(self, exc_type, exc_val, exc_tb)
use the python context manager
str id(self)
Subscription ID.
Result subscribe_multi(self, typing.List[str] address, ctrlxdatalayer.client.ResponseCallback cb, userData_c_void_p userdata=None)
Set up a subscription to multiple nodes.
Result unsubscribe(self, str address, ctrlxdatalayer.client.ResponseCallback cb, userData_c_void_p userdata=None)
Removes a node from a subscription id.
def __enter__(self)
use the python context manager
def __create_response_callback(self, ctrlxdatalayer.client.ResponseCallback cb)
def on_close(self)
on_close
def __init__(self, ctrlxdatalayer.client.Client client)
Result subscribe(self, str address, ctrlxdatalayer.client.ResponseCallback cb, userData_c_void_p userdata=None)
Set up a subscription to a node.
def close(self)
closes the client instance
def __create_sub_notify_callback(self, ctrlxdatalayer.subscription.ResponseNotifyCallback cb)
Result unsubscribe_all(self, ctrlxdatalayer.client.ResponseCallback cb, userData_c_void_p userdata=None)
Removes all subscriptions from a subscription id.