DynExp
Highly flexible laboratory automation for dynamically changing experiments.
StreamManipulator.cpp
Go to the documentation of this file.
1 // This file is part of DynExp.
2 
3 #include "stdafx.h"
4 #include "StreamManipulator.h"
5 
8 
9 PYBIND11_EMBEDDED_MODULE(PyModuleStreamManipulator, m)
10 {
11  using namespace DynExpModule;
12 
13  py::bind_vector<DynExpModule::PyStreamListType>(m, "StreamListType");
14  py::bind_vector<decltype(DynExpModule::PyStreamManipulatorOutputData::LastConsumedSampleIDsPerInputStream)>(m, "SampleIDListType");
15 
16  py::class_<PyStreamManipulatorInputData>(m, "InputData")
17  .def(py::init<>())
18  .def_readonly("ModuleID", &PyStreamManipulatorInputData::ModuleID)
19  .def_readonly("LastExecutionTime", &PyStreamManipulatorInputData::LastExecutionTime)
20  .def_readwrite("InputStreams", &PyStreamManipulatorInputData::InputStreams)
21  .def_readwrite("OutputStreams", &PyStreamManipulatorInputData::OutputStreams);
22 
23  py::class_<PyStreamManipulatorOutputData>(m, "OutputData")
24  .def(py::init<>())
25  .def_readwrite("MinNextExecutionDelay", &PyStreamManipulatorOutputData::MinNextExecutionDelay)
26  .def_readwrite("MaxNextExecutionDelay", &PyStreamManipulatorOutputData::MaxNextExecutionDelay)
27  .def_readwrite("LastConsumedSampleIDsPerInputStream", &PyStreamManipulatorOutputData::LastConsumedSampleIDsPerInputStream);
28 }
29 
30 namespace DynExpModule
31 {
33  {
35  LastExecutionTime = {};
36 
37  InputStreams.clear();
38  OutputStreams.clear();
39  }
40 
42  {
45 
47  }
48 
50  {
51  Init();
52  }
53 
55  {
56  }
57 
59  {
60  try
61  {
62  auto ModuleData = DynExp::dynamic_ModuleData_cast<StreamManipulator>(Instance.ModuleDataGetter());
63 
64  bool IsNewDataAvlbl = false;
65  for (size_t i = 0; i < ManipulatorPyFuncInput.InputStreams.size(); ++i)
66  {
67  auto& Instrument = ModuleData->GetInputDataStreams()[i];
68  auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
69  auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
70 
71  Instrument->ReadData();
72 
74  SampleStream->GetNumRecentBasicSamples(ManipulatorPyFuncOutput.LastConsumedSampleIDsPerInputStream[i]))
75  IsNewDataAvlbl = true;
76  }
77 
78  const auto now = std::chrono::system_clock::now();
81  !LastManipulatorPyFuncExecution.time_since_epoch().count())
83 
85  } // ModuleData and instruments' data unlocked here.
86  catch (const Util::TimeoutException& e)
87  {
88  if (NumFailedUpdateAttempts++ >= 3)
89  Instance.GetOwner().SetWarning(e);
90  }
91 
93  }
94 
96  {
97  IsInitialized = false;
98 
99  ManipulatorPyFuncName = "StreamManipFunc_" + Util::ToStr(GetID());
105 
108  }
109 
111  {
112  for (size_t i = 0; i < ManipulatorPyFuncInput.InputStreams.size(); ++i)
113  {
114  auto& Instrument = ModuleData->GetInputDataStreams()[i];
115  auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
116  auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
117 
118  ManipulatorPyFuncInput.InputStreams[i].StreamSizeRead = SampleStream->GetStreamSizeRead();
119  ManipulatorPyFuncInput.InputStreams[i].StreamSizeWrite = SampleStream->GetStreamSizeWrite();
120  ManipulatorPyFuncInput.InputStreams[i].NumSamplesWritten = SampleStream->GetNumSamplesWritten();
121  ManipulatorPyFuncInput.InputStreams[i].Samples = SampleStream->ReadRecentBasicSamples(ManipulatorPyFuncOutput.LastConsumedSampleIDsPerInputStream[i]);
122  }
123  for (size_t i = 0; i < ManipulatorPyFuncInput.OutputStreams.size(); ++i)
124  {
125  auto& Instrument = ModuleData->GetOutputDataStreams()[i];
126  auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
127  auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
128 
129  ManipulatorPyFuncInput.OutputStreams[i].StreamSizeRead = SampleStream->GetStreamSizeRead();
130  ManipulatorPyFuncInput.OutputStreams[i].StreamSizeWrite = SampleStream->GetStreamSizeWrite();
131  ManipulatorPyFuncInput.OutputStreams[i].NumSamplesWritten = SampleStream->GetNumSamplesWritten();
132  ManipulatorPyFuncInput.OutputStreams[i].Samples.clear();
133  }
134 
136 
138  {
139  py::gil_scoped_acquire acquire;
140 
142  if (!PyResult.is_none())
143  FuncOutput = PyResult.cast<PyStreamManipulatorOutputData>();
144  } // GIL released here.
145 
146  LastManipulatorPyFuncExecution = std::chrono::system_clock::now();
149 
150  for (size_t i = 0; i < ManipulatorPyFuncInput.InputStreams.size(); ++i)
151  {
152  auto& Instrument = ModuleData->GetInputDataStreams()[i];
153  auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
154  auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
155 
156  if (FuncOutput.LastConsumedSampleIDsPerInputStream.size() > i
157  && FuncOutput.LastConsumedSampleIDsPerInputStream[i] < SampleStream->GetNumSamplesWritten()
158  && FuncOutput.LastConsumedSampleIDsPerInputStream[i] >= SampleStream->GetNumSamplesWritten() - SampleStream->GetStreamSizeWrite())
160  else
161  ManipulatorPyFuncOutput.LastConsumedSampleIDsPerInputStream[i] = SampleStream->GetNumSamplesWritten();
162  }
163  for (size_t i = 0; i < ManipulatorPyFuncInput.OutputStreams.size(); ++i)
164  {
165  auto& Instrument = ModuleData->GetOutputDataStreams()[i];
166  auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
167  auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
168 
169  if (!ManipulatorPyFuncInput.OutputStreams[i].Samples.empty())
170  {
171  SampleStream->WriteBasicSamples(ManipulatorPyFuncInput.OutputStreams[i].Samples);
172  Instrument->WriteData();
173  }
174  }
175  }
176 
178  {
179  auto ModuleParams = DynExp::dynamic_Params_cast<StreamManipulator>(Instance->ParamsGetter());
180  auto ModuleData = DynExp::dynamic_ModuleData_cast<StreamManipulator>(Instance->ModuleDataGetter());
181 
182  Instance->LockObject(ModuleParams->InputDataStreams, ModuleData->GetInputDataStreams());
183  Instance->LockObject(ModuleParams->OutputDataStreams, ModuleData->GetOutputDataStreams());
184 
185  auto PythonCode = Util::ReadFromFile(ModuleParams->PythonCodePath.GetPath());
186  PythonCode = std::regex_replace(PythonCode, std::regex("\r\n"), "\n");
187  PythonCode = std::regex_replace(PythonCode, std::regex("\n"), std::string("\n") + Util::PyTab);
188 
189  py::gil_scoped_acquire acquire;
191  py::exec("import PyModuleStreamManipulator as StreamManipulator");
192  py::exec("def " + ManipulatorPyFuncName + "():\n" +
193  Util::PyTab + PythonCode + "\n" +
194  Util::PyTab + "if 'on_init' in locals() and callable(on_init):\n" +
195  Util::PyTab + Util::PyTab + ManipulatorPyFuncName + ".init = on_init\n" +
196  Util::PyTab + "if 'on_step' in locals() and callable(on_step):\n" +
197  Util::PyTab + Util::PyTab + ManipulatorPyFuncName + ".step = on_step\n" +
198  Util::PyTab + "if 'on_exit' in locals() and callable(on_exit):\n" +
199  Util::PyTab + Util::PyTab + ManipulatorPyFuncName + ".exit = on_exit");
200  auto ManipulatorPyFunc = py::eval(ManipulatorPyFuncName);
201  ManipulatorPyFunc();
202  ManipulatorPyFuncInit = py::hasattr(ManipulatorPyFunc, "init") ? py::getattr(ManipulatorPyFunc, "init") : py::none();
203  ManipulatorPyFuncStep = py::hasattr(ManipulatorPyFunc, "step") ? py::getattr(ManipulatorPyFunc, "step") : py::none();
204  ManipulatorPyFuncExit = py::hasattr(ManipulatorPyFunc, "exit") ? py::getattr(ManipulatorPyFunc, "exit") : py::none();
205 
207  for (size_t i = 0; i < ModuleData->GetInputDataStreams().GetList().size(); ++i)
208  {
209  auto& Instrument = ModuleData->GetInputDataStreams()[i];
210  auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
211  auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
212 
213  ManipulatorPyFuncInput.InputStreams.emplace_back(SampleStream->IsBasicSampleTimeUsed(), Instrument->GetValueUnit());
215  }
216  for (size_t i = 0; i < ModuleData->GetOutputDataStreams().GetList().size(); ++i)
217  {
218  auto& Instrument = ModuleData->GetOutputDataStreams()[i];
219  auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
220  auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
221 
222  ManipulatorPyFuncInput.OutputStreams.emplace_back(SampleStream->IsBasicSampleTimeUsed(), Instrument->GetValueUnit());
223  }
224 
226 
227  IsInitialized = true;
228  }
229 
231  {
232  auto ModuleData = DynExp::dynamic_ModuleData_cast<StreamManipulator>(Instance->ModuleDataGetter());
233 
234  Instance->UnlockObject(ModuleData->GetInputDataStreams());
235  Instance->UnlockObject(ModuleData->GetOutputDataStreams());
236 
237  try
238  {
239  py::gil_scoped_acquire acquire;
241  py::exec("del " + ManipulatorPyFuncName);
242  }
243  catch (...)
244  {
245  // Swallow any exception which might arise from the shutdown of the module's Python part
246  // since a failure of that is not considered a severe error.
247  Util::EventLogger().Log("Shutting down Python part of module \"" + GetObjectName() + "\" failed.", Util::ErrorType::Warning);
248  }
249  }
250 }
PYBIND11_EMBEDDED_MODULE(PyModuleStreamManipulator, m)
PYBIND11_MAKE_OPAQUE(DynExpModule::PyStreamListType)
Implementation of a module to process data stored in data stream instrument(s) with a Python script a...
Base class for all circular data streams based on Util::circularbuf.
void WriteBasicSamples(const BasicSampleListType &Samples)
Writes a list of basic sample to the stream.
static void import()
Make the Python interpreter import this module as PyModuleDataStreamInstrument. GIL has to be acquire...
Definition: PyModules.cpp:49
void Init()
Called by ResetImpl(dispatch_tag<DynExp::ModuleDataBase>) overridden by this class to initialize the ...
void ResetImpl(dispatch_tag< ModuleDataBase >) override final
Refer to DynExp::ModuleDataBase::Reset(). Using tag dispatch mechanism to ensure that ResetImpl() of ...
void OnInit(DynExp::ModuleInstance *Instance) const override final
This event is triggered right before the module thread starts. Override it to lock instruments this m...
size_t NumFailedUpdateAttempts
Counts how often StreamManipulator::ModuleMainLoop() contiguously failed due to an exception of type ...
std::chrono::time_point< std::chrono::system_clock > LastManipulatorPyFuncExecution
Time point when the on_step() Python function was invoked last.
std::atomic< bool > IsInitialized
Indicates whether the module has been fully initialized by finishing OnInit(). The variable is atomic...
PyFuncType ManipulatorPyFuncStep
Handle to a Python function called for each manipulation step.
PyStreamManipulatorOutputData ManipulatorPyFuncOutput
Output data returned from the on_step() Python function.
Util::DynExpErrorCodes::DynExpErrorCodes ModuleMainLoop(DynExp::ModuleInstance &Instance) override final
Module main loop. The function is executed periodically by the module thread. Also refer to GetMainLo...
PyFuncType ManipulatorPyFuncExit
Handle to a Python function called on module termination.
void ResetImpl(dispatch_tag< ModuleBase >) override final
Refer to DynExp::Object::Reset(). Using tag dispatch mechanism to ensure that ResetImpl() of every de...
PyFuncType ManipulatorPyFuncInit
Handle to a Python function called on module initialization.
void OnExit(DynExp::ModuleInstance *Instance) const override final
This event is triggered right before the module thread terminates (not due to an exception,...
std::string ManipulatorPyFuncName
Unique name of the Python function all the code of this StreamManipulator instance is declared in....
PyStreamManipulatorInputData ManipulatorPyFuncInput
Input data passed to the on_step() Python function.
void Step(Util::SynchronizedPointer< ModuleDataType > &ModuleData)
Performs a single manipulation step by preparing input data for a call to the Python function on_step...
const std::unique_ptr< ModuleDataType > ModuleData
Module data belonging to this ModuleBase instance.
Definition: Module.h:743
Refer to ParamsBase::dispatch_tag.
Definition: Module.h:189
Defines data for a thread belonging to a ModuleBase instance. Refer to RunnableInstance.
Definition: Module.h:793
const ModuleBase::ModuleDataGetterType ModuleDataGetter
Getter for module's data. Refer to ModuleBase::ModuleDataGetterType.
Definition: Module.h:825
ItemIDType GetID() const noexcept
Returns the ID of this Object instance. Thread-safe since ID is const.
Definition: Object.h:2143
auto GetObjectName(const std::chrono::milliseconds Timeout=GetParamsTimeoutDefault) const
Returns the name of this Object instance.
Definition: Object.h:2128
Refer to ParamsBase::dispatch_tag.
Definition: Object.h:2018
const Object::ParamsGetterType ParamsGetter
Invoke to obtain the parameters (derived from ParamsBase) of Owner.
Definition: Object.h:3671
void UnlockObject(LinkedObjectWrapperContainer< ObjectT > &ObjectWrapperContainer)
Unlocks an Object instance stored in the LinkedObjectWrapperContainer ObjectWrapperContainer....
Definition: Object.h:3570
void LockObject(const ParamsBase::Param< ObjectLink< ObjectT >> &LinkParam, LinkedObjectWrapperContainer< ObjectT > &ObjectWrapperContainer, std::chrono::milliseconds Timeout=ObjectLinkBase::LockObjectTimeoutDefault)
Locks an Object instance referenced by a parameter LinkParam of type ParamsBase::Param< ObjectLink< O...
Definition: Object.h:3554
const auto & GetOwner() const noexcept
Returns Owner.
Definition: Object.h:3524
Logs events like errors and writes them immediately to a HTML file in a human-readable format....
Definition: Util.h:1061
void Log(const std::string &Message, const ErrorType Type=ErrorType::Info, const size_t Line=0, const std::string &Function="", const std::string &File="", const int ErrorCode=0, const std::stacktrace &Trace={}) noexcept
Logs an event from information specified manually.
Definition: Util.cpp:309
void Reset()
Removes the owned pybind11::object after locking the GIL.
Definition: PyUtil.h:113
Pointer to lock a class derived from ISynchronizedPointerLockable for synchronizing between threads....
Definition: Util.h:170
Thrown when an operation timed out before it could be completed, especially used for locking shared d...
Definition: Exception.h:261
DynExp's module namespace contains the implementation of DynExp modules which extend DynExp's core fu...
std::vector< DynExpInstr::PyDataStreamInstrument > PyStreamListType
Type of a list of data stream instruments made available to Python.
constexpr auto Instrument
DynExpErrorCodes
DynExp's error codes
Definition: Exception.h:22
std::string ToStr(const T &Value, int Precision=-1)
Converts a (numeric) value of type T to a std::string using operator<< of std::stringstream.
Definition: Util.h:625
static constexpr auto PyTab
Character sequence to indent a Python instruction by one level.
Definition: PyUtil.h:19
std::string ReadFromFile(const QString &Filename)
Reads the entire content from a text file.
Definition: QtUtil.cpp:250
Accumulates include statements to provide a precompiled header.
PyStreamListType OutputStreams
Vector of data stream instruments to write the resulting output samples to.
PyStreamListType InputStreams
Vector of data stream instruments with input samples to be manipulated.
void Reset()
Resets all member variables of this PyStreamManipulatorInputData instance back to their default value...
DynExp::ItemIDType ModuleID
ID of the module invoking the on_step() Python function for stream manipulation.
std::chrono::time_point< std::chrono::system_clock > LastExecutionTime
Time point when the on_step() Python function was invoked last.
Output data type returned from on_step() Python function.
void Reset()
Resets all member variables of this PyStreamManipulatorOutputData instance back to their default valu...
std::chrono::milliseconds MinNextExecutionDelay
Time wo wait minimally before the on_step() Python function is called again when each input data stre...
std::chrono::milliseconds MaxNextExecutionDelay
Time to wait maximally before the on_step() Python function is called again when the input data strea...
std::vector< size_t > LastConsumedSampleIDsPerInputStream
Maintaining the order of input data streams in PyStreamManipulatorInputData::InputStreams as passed t...