DynExp
Highly flexible laboratory automation for dynamically changing experiments.
Loading...
Searching...
No Matches
StreamManipulator.cpp
Go to the documentation of this file.
1// This file is part of DynExp.
2
3#include "stdafx.h"
4#include "StreamManipulator.h"
5
8
9PYBIND11_EMBEDDED_MODULE(PyModuleStreamManipulator, m)
10{
11 using namespace DynExpModule;
12
13 py::bind_vector<DynExpModule::PyStreamListType>(m, "StreamListType");
14 py::bind_vector<decltype(DynExpModule::PyStreamManipulatorOutputData::LastConsumedSampleIDsPerInputStream)>(m, "SampleIDListType");
15
16 py::class_<PyStreamManipulatorInputData>(m, "InputData")
17 .def(py::init<>())
18 .def_readonly("ModuleID", &PyStreamManipulatorInputData::ModuleID)
19 .def_readonly("LastExecutionTime", &PyStreamManipulatorInputData::LastExecutionTime)
20 .def_readwrite("InputStreams", &PyStreamManipulatorInputData::InputStreams)
21 .def_readwrite("OutputStreams", &PyStreamManipulatorInputData::OutputStreams)
22 .def_readonly("SaveFilename", & PyStreamManipulatorInputData::SaveFilename);
23
24 py::class_<PyStreamManipulatorOutputData>(m, "OutputData")
25 .def(py::init<>())
26 .def_readwrite("MinNextExecutionDelay", &PyStreamManipulatorOutputData::MinNextExecutionDelay)
27 .def_readwrite("MaxNextExecutionDelay", &PyStreamManipulatorOutputData::MaxNextExecutionDelay)
28 .def_readwrite("LastConsumedSampleIDsPerInputStream", &PyStreamManipulatorOutputData::LastConsumedSampleIDsPerInputStream);
29}
30
31namespace DynExpModule
32{
34 {
37
38 InputStreams.clear();
39 OutputStreams.clear();
40
41 SaveFilename.clear();
42 }
43
51
56
60
62 {
63 try
64 {
65 auto ModuleData = DynExp::dynamic_ModuleData_cast<StreamManipulator>(Instance.ModuleDataGetter());
66
67 bool IsNewDataAvlbl = false;
68 for (size_t i = 0; i < ManipulatorPyFuncInput.InputStreams.size(); ++i)
69 {
70 auto& Instrument = ModuleData->GetInputDataStreams()[i];
71 auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
72 auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
73
74 Instrument->ReadData();
75
77 SampleStream->GetNumRecentBasicSamples(ManipulatorPyFuncOutput.LastConsumedSampleIDsPerInputStream[i]))
78 IsNewDataAvlbl = true;
79 }
80
81 const auto now = std::chrono::system_clock::now();
85 !LastManipulatorPyFuncExecution.time_since_epoch().count()))
87
89 } // ModuleData and instruments' data unlocked here.
90 catch (const Util::TimeoutException& e)
91 {
92 if (NumFailedUpdateAttempts++ >= 3)
93 Instance.GetOwner().SetWarning(e);
94 }
95
97 }
98
117
118 void StreamManipulator::Step(Util::SynchronizedPointer<ModuleDataType>& ModuleData, const PyFuncType& StepFunction, bool UpdateLastExecutionTime) const
119 {
120 for (size_t i = 0; i < ManipulatorPyFuncInput.InputStreams.size(); ++i)
121 {
122 auto& Instrument = ModuleData->GetInputDataStreams()[i];
123 auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
124 auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
125
126 ManipulatorPyFuncInput.InputStreams[i].StreamSizeRead = SampleStream->GetStreamSizeRead();
127 ManipulatorPyFuncInput.InputStreams[i].StreamSizeWrite = SampleStream->GetStreamSizeWrite();
128 ManipulatorPyFuncInput.InputStreams[i].NumSamplesWritten = SampleStream->GetNumSamplesWritten();
129 ManipulatorPyFuncInput.InputStreams[i].Samples = SampleStream->ReadRecentBasicSamples(ManipulatorPyFuncOutput.LastConsumedSampleIDsPerInputStream[i]);
130 }
131 for (size_t i = 0; i < ManipulatorPyFuncInput.OutputStreams.size(); ++i)
132 {
133 auto& Instrument = ModuleData->GetOutputDataStreams()[i];
134 auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
135 auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
136
137 ManipulatorPyFuncInput.OutputStreams[i].StreamSizeRead = SampleStream->GetStreamSizeRead();
138 ManipulatorPyFuncInput.OutputStreams[i].StreamSizeWrite = SampleStream->GetStreamSizeWrite();
139 ManipulatorPyFuncInput.OutputStreams[i].NumSamplesWritten = SampleStream->GetNumSamplesWritten();
140 ManipulatorPyFuncInput.OutputStreams[i].Samples.clear();
141 }
142
144
146 {
147 py::gil_scoped_acquire acquire;
148
149 auto PyResult = StepFunction(&ManipulatorPyFuncInput);
150 if (!PyResult.is_none())
151 FuncOutput = PyResult.cast<PyStreamManipulatorOutputData>();
152 } // GIL released here.
153
154 if (UpdateLastExecutionTime)
155 LastManipulatorPyFuncExecution = std::chrono::system_clock::now();
158
159 for (size_t i = 0; i < ManipulatorPyFuncInput.InputStreams.size(); ++i)
160 {
161 auto& Instrument = ModuleData->GetInputDataStreams()[i];
162 auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
163 auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
164
165 if (FuncOutput.LastConsumedSampleIDsPerInputStream.size() > i
166 && FuncOutput.LastConsumedSampleIDsPerInputStream[i] < SampleStream->GetNumSamplesWritten()
167 && FuncOutput.LastConsumedSampleIDsPerInputStream[i] >= SampleStream->GetNumSamplesWritten() - SampleStream->GetStreamSizeRead())
169 else
170 ManipulatorPyFuncOutput.LastConsumedSampleIDsPerInputStream[i] = SampleStream->GetNumSamplesWritten();
171
172 if (ManipulatorPyFuncInput.InputStreams[i].ShouldCLear())
173 {
174 SampleStream->Clear();
176 }
177 }
178 for (size_t i = 0; i < ManipulatorPyFuncInput.OutputStreams.size(); ++i)
179 {
180 auto& Instrument = ModuleData->GetOutputDataStreams()[i];
181 auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
182 auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
183
184 if (ManipulatorPyFuncInput.OutputStreams[i].ShouldCLear())
185 {
186 SampleStream->Clear();
188 }
189
190 if (!ManipulatorPyFuncInput.OutputStreams[i].Samples.empty())
191 {
192 SampleStream->WriteBasicSamples(ManipulatorPyFuncInput.OutputStreams[i].Samples);
193 Instrument->WriteData();
194 }
195 }
196 }
197
199 {
201
202 auto ModuleParams = DynExp::dynamic_Params_cast<StreamManipulator>(Instance->ParamsGetter());
203 auto ModuleData = DynExp::dynamic_ModuleData_cast<StreamManipulator>(Instance->ModuleDataGetter());
204
205 Instance->LockObject(ModuleParams->InputDataStreams, ModuleData->GetInputDataStreams());
206 Instance->LockObject(ModuleParams->OutputDataStreams, ModuleData->GetOutputDataStreams());
207
208 if (ModuleParams->Communicator.ContainsID())
209 Instance->LockObject(ModuleParams->Communicator, ModuleData->GetCommunicator());
210
211 auto PythonCode = Util::ReadFromFile(ModuleParams->PythonCodePath.GetPath());
212 PythonCode = std::regex_replace(PythonCode, std::regex("\r\n"), "\n");
213 PythonCode = std::regex_replace(PythonCode, std::regex("\n"), std::string("\n") + Util::PyTab);
214
215 py::gil_scoped_acquire acquire;
217 py::exec("import PyModuleStreamManipulator as StreamManipulator");
218 py::exec("def " + ManipulatorPyFuncName + "():\n" +
219 Util::PyTab + PythonCode + "\n" +
220 Util::PyTab + "if 'on_init' in locals() and callable(on_init):\n" +
221 Util::PyTab + Util::PyTab + ManipulatorPyFuncName + ".init = on_init\n" +
222 Util::PyTab + "if 'on_step' in locals() and callable(on_step):\n" +
223 Util::PyTab + Util::PyTab + ManipulatorPyFuncName + ".step = on_step\n" +
224 Util::PyTab + "if 'on_exit' in locals() and callable(on_exit):\n" +
225 Util::PyTab + Util::PyTab + ManipulatorPyFuncName + ".exit = on_exit\n" +
226 Util::PyTab + "if 'on_finished' in locals() and callable(on_finished):\n" +
227 Util::PyTab + Util::PyTab + ManipulatorPyFuncName + ".finished = on_finished\n" +
228 Util::PyTab + "if 'on_start' in locals() and callable(on_start):\n" +
229 Util::PyTab + Util::PyTab + ManipulatorPyFuncName + ".start = on_start\n" +
230 Util::PyTab + "if 'on_stop' in locals() and callable(on_stop):\n" +
231 Util::PyTab + Util::PyTab + ManipulatorPyFuncName + ".stop = on_stop\n" +
232 Util::PyTab + "if 'on_trigger' in locals() and callable(on_trigger):\n" +
233 Util::PyTab + Util::PyTab + ManipulatorPyFuncName + ".trigger = on_trigger");
234 auto ManipulatorPyFunc = py::eval(ManipulatorPyFuncName);
235 ManipulatorPyFunc();
236 ManipulatorPyFuncInit = py::hasattr(ManipulatorPyFunc, "init") ? py::getattr(ManipulatorPyFunc, "init") : py::none();
237 ManipulatorPyFuncStep = py::hasattr(ManipulatorPyFunc, "step") ? py::getattr(ManipulatorPyFunc, "step") : py::none();
238 ManipulatorPyFuncExit = py::hasattr(ManipulatorPyFunc, "exit") ? py::getattr(ManipulatorPyFunc, "exit") : py::none();
239 ManipulatorPyFuncFinished = py::hasattr(ManipulatorPyFunc, "finished") ? py::getattr(ManipulatorPyFunc, "finished") : py::none();
240 ManipulatorPyFuncStart = py::hasattr(ManipulatorPyFunc, "start") ? py::getattr(ManipulatorPyFunc, "start") : py::none();
241 ManipulatorPyFuncStop = py::hasattr(ManipulatorPyFunc, "stop") ? py::getattr(ManipulatorPyFunc, "stop") : py::none();
242 ManipulatorPyFuncTrigger = py::hasattr(ManipulatorPyFunc, "trigger") ? py::getattr(ManipulatorPyFunc, "trigger") : py::none();
243
244 if (ModuleParams->Communicator.ContainsID() && ManipulatorPyFuncFinished)
246 if (ModuleParams->Communicator.ContainsID() && ManipulatorPyFuncStart)
248 if (ModuleParams->Communicator.ContainsID() && ManipulatorPyFuncStop)
250 if (ModuleParams->Communicator.ContainsID() && ManipulatorPyFuncTrigger)
252
254 for (size_t i = 0; i < ModuleData->GetInputDataStreams().GetList().size(); ++i)
255 {
256 auto& Instrument = ModuleData->GetInputDataStreams()[i];
257 auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
258 auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
259
260 ManipulatorPyFuncInput.InputStreams.emplace_back(SampleStream->IsBasicSampleTimeUsed(), Instrument->GetValueUnit());
262 }
263 for (size_t i = 0; i < ModuleData->GetOutputDataStreams().GetList().size(); ++i)
264 {
265 auto& Instrument = ModuleData->GetOutputDataStreams()[i];
266 auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
267 auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
268
269 ManipulatorPyFuncInput.OutputStreams.emplace_back(SampleStream->IsBasicSampleTimeUsed(), Instrument->GetValueUnit());
270 }
271
273
274 IsInitialized = true;
275 }
276
278 {
279 try
280 {
281 py::gil_scoped_acquire acquire;
283 py::exec("del " + ManipulatorPyFuncName);
284 }
285 catch (...)
286 {
287 // Swallow any exception which might arise from the shutdown of the module's Python part
288 // since a failure of that is not considered a severe error.
289 Util::EventLog().Log("[StreamManipulator] Shutting down Python part of module \"" + GetObjectName() + "\" failed.", Util::ErrorType::Warning);
290 }
291
292 auto ModuleData = DynExp::dynamic_ModuleData_cast<StreamManipulator>(Instance->ModuleDataGetter());
293
294 Instance->UnlockObject(ModuleData->GetInputDataStreams());
295 Instance->UnlockObject(ModuleData->GetOutputDataStreams());
296 Instance->UnlockObject(ModuleData->GetCommunicator());
297
303 }
304
305 void StreamManipulator::OnSetFilename(DynExp::ModuleInstance* Instance, const std::string& SaveFilename) const
306 {
307 // Add file extension on Python side.
309 }
310
312 {
313 auto ModuleData = DynExp::dynamic_ModuleData_cast<StreamManipulator>(Instance->ModuleDataGetter());
314
316 }
317
319 {
320 auto ModuleData = DynExp::dynamic_ModuleData_cast<StreamManipulator>(Instance->ModuleDataGetter());
321
323 }
324
326 {
327 auto ModuleData = DynExp::dynamic_ModuleData_cast<StreamManipulator>(Instance->ModuleDataGetter());
328
330 }
331
333 {
334 auto ModuleData = DynExp::dynamic_ModuleData_cast<StreamManipulator>(Instance->ModuleDataGetter());
335
337
338 if (ModuleData->GetCommunicator().valid())
339 ModuleData->GetCommunicator()->PostEvent(*this, FinishedEvent{});
340 }
341}
PYBIND11_EMBEDDED_MODULE(PyModuleStreamManipulator, m)
PYBIND11_MAKE_OPAQUE(DynExpModule::PyStreamListType)
Implementation of a module to process data stored in data stream instrument(s) with a Python script a...
Base class for all circular data streams based on Util::circularbuf.
void Clear()
Removes all samples from the stream's buffer.
static void import()
Make the Python interpreter import this module as PyModuleDataStreamInstrument. GIL has to be acquire...
Definition PyModules.cpp:52
This event signals that an action (like a measurement) started by a TriggerEvent has been completed.
void Init()
Called by ResetImpl(dispatch_tag<DynExp::ModuleDataBase>) overridden by this class to initialize the ...
void ResetImpl(dispatch_tag< ModuleDataBase >) override final
Refer to DynExp::ModuleDataBase::Reset(). Using tag dispatch mechanism to ensure that ResetImpl() of ...
void OnStart(DynExp::ModuleInstance *Instance) const
Called when receiving DynExpModule::StartEvent.
void Step(Util::SynchronizedPointer< ModuleDataType > &ModuleData, const PyFuncType &StepFunction, bool UpdateLastExecutionTime=true) const
Performs a single manipulation step by preparing input data for a call to the Python function on_step...
void OnInit(DynExp::ModuleInstance *Instance) const override final
This event is triggered right before the module thread starts. Override it to lock instruments this m...
size_t NumFailedUpdateAttempts
Counts how often StreamManipulator::ModuleMainLoop() contiguously failed due to an exception of type ...
std::chrono::time_point< std::chrono::system_clock > LastManipulatorPyFuncExecution
Time point when the on_step() Python function was invoked last.
std::atomic< bool > IsInitialized
Indicates whether the module has been fully initialized by finishing OnInit(). The variable is atomic...
void OnTrigger(DynExp::ModuleInstance *Instance) const
Called when receiving DynExpModule::TriggerEvent.
PyFuncType ManipulatorPyFuncStep
Handle to a Python function called for each manipulation step.
PyFuncType ManipulatorPyFuncFinished
Handle to a Python function called for DynExpModule::FinishedEvent.
PyStreamManipulatorOutputData ManipulatorPyFuncOutput
Output data returned from the on_step() and event handler Python functions.
Util::DynExpErrorCodes::DynExpErrorCodes ModuleMainLoop(DynExp::ModuleInstance &Instance) override final
Module main loop. The function is executed periodically by the module thread. Also refer to GetMainLo...
PyFuncType ManipulatorPyFuncExit
Handle to a Python function called on module termination.
void OnFinished(DynExp::ModuleInstance *Instance) const
Called when receiving DynExpModule::FinishedEvent.
void ResetImpl(dispatch_tag< ModuleBase >) override final
Refer to DynExp::Object::Reset(). Using tag dispatch mechanism to ensure that ResetImpl() of every de...
PyFuncType ManipulatorPyFuncStart
Handle to a Python function called for DynExpModule::StartEvent.
void OnStop(DynExp::ModuleInstance *Instance) const
Called when receiving DynExpModule::StopEvent.
PyFuncType ManipulatorPyFuncInit
Handle to a Python function called on module initialization.
void OnExit(DynExp::ModuleInstance *Instance) const override final
This event is triggered right before the module thread terminates (not due to an exception,...
PyFuncType ManipulatorPyFuncStop
Handle to a Python function called for DynExpModule::StopEvent.
std::string ManipulatorPyFuncName
Unique name of the Python function all the code of this StreamManipulator instance is declared in....
PyStreamManipulatorInputData ManipulatorPyFuncInput
Input data passed to the on_step() and event handler Python functions.
void OnSetFilename(DynExp::ModuleInstance *Instance, const std::string &Filename) const
Called when receiving DynExpModule::SetFilenameEvent.
PyFuncType ManipulatorPyFuncTrigger
Handle to a Python function called for DynExpModule::TriggerEvent.
static void Register(const ModuleBase &Listener, CallableT EventFunc, ItemIDType CommunicatorID=ItemIDNotSet)
Registers/Subscribes module Listener to the event with the event function EventFunc....
Definition Module.h:1262
static void Deregister(const ModuleBase &Listener)
Deregisters/unsubscribes module Listener from the event, regardless of the inter-module communicator ...
Definition Module.h:1271
const std::unique_ptr< ModuleDataType > ModuleData
Module data belonging to this ModuleBase instance.
Definition Module.h:788
Refer to ParamsBase::dispatch_tag.
Definition Module.h:191
Defines data for a thread belonging to a ModuleBase instance. Refer to RunnableInstance.
Definition Module.h:840
const ModuleBase::ModuleDataGetterType ModuleDataGetter
Getter for module's data. Refer to ModuleBase::ModuleDataGetterType.
Definition Module.h:872
ItemIDType GetID() const noexcept
Returns the ID of this Object instance. Thread-safe since ID is const.
Definition Object.h:2143
auto GetObjectName(const std::chrono::milliseconds Timeout=GetParamsTimeoutDefault) const
Returns the name of this Object instance.
Definition Object.h:2128
Refer to ParamsBase::dispatch_tag.
Definition Object.h:2018
const Object::ParamsGetterType ParamsGetter
Invoke to obtain the parameters (derived from ParamsBase) of Owner.
Definition Object.h:3710
void UnlockObject(LinkedObjectWrapperContainer< ObjectT > &ObjectWrapperContainer)
Unlocks an Object instance stored in the LinkedObjectWrapperContainer ObjectWrapperContainer....
Definition Object.h:3609
void LockObject(const ParamsBase::Param< ObjectLink< ObjectT > > &LinkParam, LinkedObjectWrapperContainer< ObjectT > &ObjectWrapperContainer, std::chrono::milliseconds Timeout=ObjectLinkBase::LockObjectTimeoutDefault)
Locks an Object instance referenced by a parameter LinkParam of type ParamsBase::Param< ObjectLink< O...
Definition Object.h:3593
const auto & GetOwner() const noexcept
Returns Owner.
Definition Object.h:3556
void Log(const std::string &Message, const ErrorType Type=ErrorType::Info, const size_t Line=0, const std::string &Function="", const std::string &File="", const int ErrorCode=0, const std::stacktrace &Trace={}) noexcept
Logs an event from information specified manually.
Definition Util.cpp:317
void Reset()
Removes the owned pybind11::object after locking the GIL.
Definition PyUtil.h:119
Pointer to lock a class derived from ISynchronizedPointerLockable for synchronizing between threads....
Definition Util.h:170
Thrown when an operation timed out before it could be completed, especially used for locking shared d...
Definition Exception.h:262
DynExp's module namespace contains the implementation of DynExp modules which extend DynExp's core fu...
std::vector< DynExpInstr::PyDataStreamInstrument > PyStreamListType
Type of a list of data stream instruments made available to Python.
DynExpErrorCodes
DynExp's error codes
Definition Exception.h:22
std::string ToStr(const T &Value, int Precision=-1)
Converts a (numeric) value of type T to a std::string using operator<< of std::stringstream.
Definition Util.h:688
EventLogger & EventLog()
This function holds a static EventLogger instance and returns a reference to it. DynExp uses only one...
Definition Util.cpp:517
static constexpr auto PyTab
Character sequence to indent a Python instruction by one level.
Definition PyUtil.h:19
std::string ReadFromFile(const QString &Filename)
Reads the entire content from a text file.
Definition QtUtil.cpp:250
Accumulates include statements to provide a precompiled header.
PyStreamListType OutputStreams
Vector of data stream instruments to write the resulting output samples to.
PyStreamListType InputStreams
Vector of data stream instruments with input samples to be manipulated.
void Reset()
Resets all member variables of this PyStreamManipulatorInputData instance back to their default value...
DynExp::ItemIDType ModuleID
ID of the module invoking the on_step() or an event handler Python function for stream manipulation.
std::string SaveFilename
Filename and path where the Python functions invoked by the StreamManipulator module can save data....
std::chrono::time_point< std::chrono::system_clock > LastExecutionTime
Time point when the on_step() Python function was invoked last.
Output data type returned from on_step() or event handler Python functions.
void Reset()
Resets all member variables of this PyStreamManipulatorOutputData instance back to their default valu...
std::chrono::milliseconds MinNextExecutionDelay
Time wo wait minimally before the on_step() Python function is called again when each input data stre...
std::chrono::milliseconds MaxNextExecutionDelay
Time to wait maximally before the on_step() Python function is called again when the input data strea...
std::vector< size_t > LastConsumedSampleIDsPerInputStream
Maintaining the order of input data streams in PyStreamManipulatorInputData::InputStreams as passed t...