ctrlX Data Layer API for Python  3.3.0
The ctrlX Data Layer API allows access to the ctrlX Data Layer with Python
bulk_util.py
1 """
2 bulk util
3 
4 """
5 import ctypes
6 import typing
7 
8 import ctrlxdatalayer
9 from ctrlxdatalayer.clib_bulk import (C_DLR_BULK, C_BulkRequest,
10  C_VecBulkRequest)
11 from ctrlxdatalayer.clib_variant import C_DLR_VARIANT
12 from ctrlxdatalayer.variant import Result, Variant, VariantRef, copy
13 
14 
15 class _Request:
16  """
17  class Bulk _Request
18  """
19  __slots__ = ['__address', '__data']
20 
21  def __init__(self, addr: str, data: Variant):
22  """
23  init Request parameters
24  """
25  self.__address__address = addr.encode('utf-8') # !!
26  self.__data__data = None if data is None else data.get_handle()
27 
28  def __enter__(self):
29  """
30  use the python context manager
31  """
32  return self
33 
34  def __exit__(self, exc_type, exc_val, exc_tb):
35  """
36  use the python context manager
37  """
38  self.closeclose()
39 
40  def close(self):
41  """
42  close
43  """
44  pass
45 
46  def get_address(self) -> bytes:
47  """
48  get_address
49 
50  Returns:
51  str: Address of the request
52  """
53  return self.__address
54 
55  def get_data(self) -> C_DLR_VARIANT:
56  """
57  get_data
58 
59  Returns:
60  C_DLR_VARIANT: Input data of the request
61  """
62  return self.__data__data
63 
64 
65 def _bulkDelete(bulk: C_DLR_BULK):
66  """_bulkDelete
67 
68  Args:
69  bulk (C_DLR_BULK): Reference to the bulk
70  """
71  ctrlxdatalayer.clib.libcomm_datalayer.DLR_bulkDelete(bulk)
72 
73 
74 def _bulkCreate(size: int) -> C_DLR_BULK:
75  """_bulkCreate
76 
77  Args:
78  size (int): size of bulk requests
79 
80  Returns:
81  C_DLR_BULK: Reference to the bulk
82  """
83  return ctrlxdatalayer.clib.libcomm_datalayer.DLR_bulkCreate(
84  ctypes.c_size_t(size))
85 
86 
87 def _bulkSetRequestAddress(bulk: C_DLR_BULK, i: int, addr: bytes) -> Result:
88  """_bulkSetRequestAddress
89 
90  Args:
91  bulk (C_DLR_BULK): Reference to the bulk
92  i (int): index [0..]
93  addr (bytes): address
94 
95  Returns:
96  Result: status of function call
97  """
98  return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_bulkSetRequestAddress(
99  bulk, ctypes.c_size_t(i), addr))
100 
101 
102 def _bulkSetRequestData(bulk: C_DLR_BULK, i: int, data: C_DLR_VARIANT):
103  """_bulkSetRequestData
104 
105  Args:
106  bulk (C_DLR_BULK): Reference to the bulk
107  i (int): index [0..]
108  data (C_DLR_VARIANT): Argument of read of write data
109 
110  Returns:
111  Result: status of function call
112  """
113  return Result(ctrlxdatalayer.clib.libcomm_datalayer.DLR_bulkSetRequestData(
114  bulk, ctypes.c_size_t(i), data))
115 
116 
117 class _BulkCreator:
118  """
119  class _BulkCreator
120  """
121  __slots__ = ['__bulk']
122 
123  def __init__(self, reqs: typing.List[_Request]):
124  """
125  __init__
126 
127  Args:
128  reqs (typing.List[_Request]): List of request
129  """
130  self.__bulk = None
131  self.__create(reqs)
132 
133  def __enter__(self):
134  """
135  use the python context manager
136  """
137  return self
138 
139  def __exit__(self, exc_type, exc_val, exc_tb):
140  """
141  use the python context manager
142  """
143  self.closeclose()
144 
145  def close(self):
146  """
147  close
148  """
149  if self.__bulk__bulk is None:
150  return
151  _bulkDelete(self.__bulk__bulk)
152  self.__bulk__bulk = None
153 
154  def get_handle(self) -> C_DLR_BULK:
155  """get_handle
156 
157  Returns:
158  clib_client.C_DLR_BULK: handle value of bulk
159  """
160  return self.__bulk__bulk
161 
162  def __create(self, reqs: typing.List[_Request]):
163  """__create
164 
165  Args:
166  reqs (typing.List[_Request]): List of request
167 
168  Returns:
169  clib_client.C_DLR_BULK: handle value of bulk
170  """
171  req_len = len(reqs)
172  bulk = _bulkCreate(req_len)
173 
174  i = 0
175  for r in reqs:
176  result = _bulkSetRequestAddress(bulk, i, r.get_address())
177  if result != Result.OK:
178  print(f"set address error {result}")
179  return None
180  if r.get_data() is not None:
181  result = _bulkSetRequestData(bulk, i, r.get_data())
182  if result != Result.OK:
183  print(f"set data error {result}")
184  return None
185  i += 1
186  self.__bulk__bulk = bulk
187 
188 
189 def _createBulkRequest(count: int):
190  """def _createBulkRequest(count: int):
191 
192  Args:
193  count (int): size of bulk requests
194 
195  Returns:
196  ctypes.POINTER(C_VecBulkRequest): Reference to the bulk request
197  """
198  return ctrlxdatalayer.clib.libcomm_datalayer.DLR_createBulkRequest(ctypes.c_size_t(count))
199 
200 
201 def _deleteBulkRequest(bulk_req):
202  """def _deleteBulkRequest(count: int):
203 
204  Args:
205  ctypes.POINTER(C_VecBulkRequest): Reference to the bulk request
206  """
207  ctrlxdatalayer.clib.libcomm_datalayer.DLR_deleteBulkRequest(bulk_req)
208 
209 
210 class _AsyncCreator:
211  """
212  class _AsyncCreator
213  """
214  __slots__ = ['__bulk_request']
215 
216  def __init__(self, reqs: typing.List[_Request]):
217  """
218  __init__
219 
220  Args:
221  reqs (typing.List[_Request]): List of request
222  """
223  self.__bulk_request = None
224  self.__create__create(reqs)
225 
226  def __enter__(self):
227  """
228  use the python context manager
229  """
230  return self
231 
232  def __exit__(self, exc_type, exc_val, exc_tb):
233  """
234  use the python context manager
235  """
236  self.closeclose()
237 
238  def close(self):
239  """
240  close
241  """
242  if self.__bulk_request__bulk_request is not None:
243  _deleteBulkRequest(self.__bulk_request__bulk_request)
244 
245  def get_bulk_request(self) -> ctypes.POINTER(C_VecBulkRequest):
246  """get_bulk_request
247 
248  Returns:
249  ctypes.POINTER: pointer of C_VecBulkRequest
250  """
251  return self.__bulk_request__bulk_request
252 
253  def __create(self, reqs: typing.List[_Request]):
254  """__create
255 
256  Args:
257  reqs (typing.List[_Request]): List of request
258 
259  Returns:
260  clib_client.C_DLR_BULK: handle value of bulk
261  """
262  req_len = len(reqs)
263  bulkreq = _createBulkRequest(req_len)
264  ptr = ctypes.cast(bulkreq[0].request, ctypes.POINTER(
265  C_BulkRequest*req_len))
266  i = 0
267  for r in reqs:
268  ptr[0][i].address = r.get_address()
269  if r.get_data() is not None:
270  copy(VariantRef(ptr[0][i].data), VariantRef(r.get_data()))
271  i += 1
272  self.__bulk_request__bulk_request = bulkreq
ctypes.POINTER(C_VecBulkRequest) get_bulk_request(self)
get_bulk_request
Definition: bulk_util.py:270
def __exit__(self, exc_type, exc_val, exc_tb)
use the python context manager
Definition: bulk_util.py:154
def __init__(self, typing.List[_Request] reqs)
init
Definition: bulk_util.py:141
def __enter__(self)
use the python context manager
Definition: bulk_util.py:148
def __create(self, typing.List[_Request] reqs)
Definition: bulk_util.py:184
C_DLR_BULK get_handle(self)
get_handle
Definition: bulk_util.py:171
def __exit__(self, exc_type, exc_val, exc_tb)
use the python context manager
Definition: bulk_util.py:39
def __init__(self, str addr, Variant data)
init Request parameters
Definition: bulk_util.py:26
def __enter__(self)
use the python context manager
Definition: bulk_util.py:33
bytes get_address(self)
get_address
Definition: bulk_util.py:54
C_DLR_VARIANT get_data(self)
get_data
Definition: bulk_util.py:63