DynExp
Highly flexible laboratory automation for dynamically changing experiments.
Loading...
Searching...
No Matches
StreamManipulator.cpp
Go to the documentation of this file.
1// This file is part of DynExp.
2
3#include "stdafx.h"
4#include "StreamManipulator.h"
5
8
9PYBIND11_EMBEDDED_MODULE(PyModuleStreamManipulator, m)
10{
11 using namespace DynExpModule;
12
13 py::bind_vector<DynExpModule::PyStreamListType>(m, "StreamListType");
14 py::bind_vector<decltype(DynExpModule::PyStreamManipulatorOutputData::LastConsumedSampleIDsPerInputStream)>(m, "SampleIDListType");
15
16 py::class_<PyStreamManipulatorInputData>(m, "InputData")
17 .def(py::init<>())
18 .def_readonly("ModuleID", &PyStreamManipulatorInputData::ModuleID)
19 .def_readonly("LastExecutionTime", &PyStreamManipulatorInputData::LastExecutionTime)
20 .def_readwrite("InputStreams", &PyStreamManipulatorInputData::InputStreams)
21 .def_readwrite("OutputStreams", &PyStreamManipulatorInputData::OutputStreams);
22
23 py::class_<PyStreamManipulatorOutputData>(m, "OutputData")
24 .def(py::init<>())
25 .def_readwrite("MinNextExecutionDelay", &PyStreamManipulatorOutputData::MinNextExecutionDelay)
26 .def_readwrite("MaxNextExecutionDelay", &PyStreamManipulatorOutputData::MaxNextExecutionDelay)
27 .def_readwrite("LastConsumedSampleIDsPerInputStream", &PyStreamManipulatorOutputData::LastConsumedSampleIDsPerInputStream);
28}
29
30namespace DynExpModule
31{
40
48
53
57
59 {
60 try
61 {
62 auto ModuleData = DynExp::dynamic_ModuleData_cast<StreamManipulator>(Instance.ModuleDataGetter());
63
64 bool IsNewDataAvlbl = false;
65 for (size_t i = 0; i < ManipulatorPyFuncInput.InputStreams.size(); ++i)
66 {
67 auto& Instrument = ModuleData->GetInputDataStreams()[i];
68 auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
69 auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
70
71 Instrument->ReadData();
72
74 SampleStream->GetNumRecentBasicSamples(ManipulatorPyFuncOutput.LastConsumedSampleIDsPerInputStream[i]))
75 IsNewDataAvlbl = true;
76 }
77
78 const auto now = std::chrono::system_clock::now();
81 !LastManipulatorPyFuncExecution.time_since_epoch().count())
83
85 } // ModuleData and instruments' data unlocked here.
86 catch (const Util::TimeoutException& e)
87 {
88 if (NumFailedUpdateAttempts++ >= 3)
89 Instance.GetOwner().SetWarning(e);
90 }
91
93 }
94
109
111 {
112 for (size_t i = 0; i < ManipulatorPyFuncInput.InputStreams.size(); ++i)
113 {
114 auto& Instrument = ModuleData->GetInputDataStreams()[i];
115 auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
116 auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
117
118 ManipulatorPyFuncInput.InputStreams[i].StreamSizeRead = SampleStream->GetStreamSizeRead();
119 ManipulatorPyFuncInput.InputStreams[i].StreamSizeWrite = SampleStream->GetStreamSizeWrite();
120 ManipulatorPyFuncInput.InputStreams[i].NumSamplesWritten = SampleStream->GetNumSamplesWritten();
121 ManipulatorPyFuncInput.InputStreams[i].Samples = SampleStream->ReadRecentBasicSamples(ManipulatorPyFuncOutput.LastConsumedSampleIDsPerInputStream[i]);
122 }
123 for (size_t i = 0; i < ManipulatorPyFuncInput.OutputStreams.size(); ++i)
124 {
125 auto& Instrument = ModuleData->GetOutputDataStreams()[i];
126 auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
127 auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
128
129 ManipulatorPyFuncInput.OutputStreams[i].StreamSizeRead = SampleStream->GetStreamSizeRead();
130 ManipulatorPyFuncInput.OutputStreams[i].StreamSizeWrite = SampleStream->GetStreamSizeWrite();
131 ManipulatorPyFuncInput.OutputStreams[i].NumSamplesWritten = SampleStream->GetNumSamplesWritten();
132 ManipulatorPyFuncInput.OutputStreams[i].Samples.clear();
133 }
134
136
138 {
139 py::gil_scoped_acquire acquire;
140
142 if (!PyResult.is_none())
143 FuncOutput = PyResult.cast<PyStreamManipulatorOutputData>();
144 } // GIL released here.
145
146 LastManipulatorPyFuncExecution = std::chrono::system_clock::now();
149
150 for (size_t i = 0; i < ManipulatorPyFuncInput.InputStreams.size(); ++i)
151 {
152 auto& Instrument = ModuleData->GetInputDataStreams()[i];
153 auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
154 auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
155
156 if (FuncOutput.LastConsumedSampleIDsPerInputStream.size() > i
157 && FuncOutput.LastConsumedSampleIDsPerInputStream[i] < SampleStream->GetNumSamplesWritten()
158 && FuncOutput.LastConsumedSampleIDsPerInputStream[i] >= SampleStream->GetNumSamplesWritten() - SampleStream->GetStreamSizeWrite())
160 else
161 ManipulatorPyFuncOutput.LastConsumedSampleIDsPerInputStream[i] = SampleStream->GetNumSamplesWritten();
162 }
163 for (size_t i = 0; i < ManipulatorPyFuncInput.OutputStreams.size(); ++i)
164 {
165 auto& Instrument = ModuleData->GetOutputDataStreams()[i];
166 auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
167 auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
168
169 if (!ManipulatorPyFuncInput.OutputStreams[i].Samples.empty())
170 {
172 Instrument->WriteData();
173 }
174 }
175 }
176
178 {
179 auto ModuleParams = DynExp::dynamic_Params_cast<StreamManipulator>(Instance->ParamsGetter());
180 auto ModuleData = DynExp::dynamic_ModuleData_cast<StreamManipulator>(Instance->ModuleDataGetter());
181
182 Instance->LockObject(ModuleParams->InputDataStreams, ModuleData->GetInputDataStreams());
183 Instance->LockObject(ModuleParams->OutputDataStreams, ModuleData->GetOutputDataStreams());
184
185 auto PythonCode = Util::ReadFromFile(ModuleParams->PythonCodePath.GetPath());
186 PythonCode = std::regex_replace(PythonCode, std::regex("\r\n"), "\n");
187 PythonCode = std::regex_replace(PythonCode, std::regex("\n"), std::string("\n") + Util::PyTab);
188
189 py::gil_scoped_acquire acquire;
191 py::exec("import PyModuleStreamManipulator as StreamManipulator");
192 py::exec("def " + ManipulatorPyFuncName + "():\n" +
193 Util::PyTab + PythonCode + "\n" +
194 Util::PyTab + "if 'on_init' in locals() and callable(on_init):\n" +
195 Util::PyTab + Util::PyTab + ManipulatorPyFuncName + ".init = on_init\n" +
196 Util::PyTab + "if 'on_step' in locals() and callable(on_step):\n" +
197 Util::PyTab + Util::PyTab + ManipulatorPyFuncName + ".step = on_step\n" +
198 Util::PyTab + "if 'on_exit' in locals() and callable(on_exit):\n" +
199 Util::PyTab + Util::PyTab + ManipulatorPyFuncName + ".exit = on_exit");
200 auto ManipulatorPyFunc = py::eval(ManipulatorPyFuncName);
201 ManipulatorPyFunc();
202 ManipulatorPyFuncInit = py::hasattr(ManipulatorPyFunc, "init") ? py::getattr(ManipulatorPyFunc, "init") : py::none();
203 ManipulatorPyFuncStep = py::hasattr(ManipulatorPyFunc, "step") ? py::getattr(ManipulatorPyFunc, "step") : py::none();
204 ManipulatorPyFuncExit = py::hasattr(ManipulatorPyFunc, "exit") ? py::getattr(ManipulatorPyFunc, "exit") : py::none();
205
207 for (size_t i = 0; i < ModuleData->GetInputDataStreams().GetList().size(); ++i)
208 {
209 auto& Instrument = ModuleData->GetInputDataStreams()[i];
210 auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
211 auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
212
213 ManipulatorPyFuncInput.InputStreams.emplace_back(SampleStream->IsBasicSampleTimeUsed(), Instrument->GetValueUnit());
215 }
216 for (size_t i = 0; i < ModuleData->GetOutputDataStreams().GetList().size(); ++i)
217 {
218 auto& Instrument = ModuleData->GetOutputDataStreams()[i];
219 auto InstrData = DynExp::dynamic_InstrumentData_cast<DynExpInstr::DataStreamInstrument>(Instrument->GetInstrumentData());
220 auto SampleStream = InstrData->GetCastSampleStream<SampleStreamType>();
221
222 ManipulatorPyFuncInput.OutputStreams.emplace_back(SampleStream->IsBasicSampleTimeUsed(), Instrument->GetValueUnit());
223 }
224
226
227 IsInitialized = true;
228 }
229
231 {
232 auto ModuleData = DynExp::dynamic_ModuleData_cast<StreamManipulator>(Instance->ModuleDataGetter());
233
234 Instance->UnlockObject(ModuleData->GetInputDataStreams());
235 Instance->UnlockObject(ModuleData->GetOutputDataStreams());
236
237 try
238 {
239 py::gil_scoped_acquire acquire;
241 py::exec("del " + ManipulatorPyFuncName);
242 }
243 catch (...)
244 {
245 // Swallow any exception which might arise from the shutdown of the module's Python part
246 // since a failure of that is not considered a severe error.
247 Util::EventLogger().Log("Shutting down Python part of module \"" + GetObjectName() + "\" failed.", Util::ErrorType::Warning);
248 }
249 }
250}
PYBIND11_EMBEDDED_MODULE(PyModuleStreamManipulator, m)
PYBIND11_MAKE_OPAQUE(DynExpModule::PyStreamListType)
Implementation of a module to process data stored in data stream instrument(s) with a Python script a...
Base class for all circular data streams based on Util::circularbuf.
void WriteBasicSamples(const BasicSampleListType &Samples)
Writes a list of basic sample to the stream.
static void import()
Make the Python interpreter import this module as PyModuleDataStreamInstrument. GIL has to be acquire...
Definition PyModules.cpp:49
void Init()
Called by ResetImpl(dispatch_tag<DynExp::ModuleDataBase>) overridden by this class to initialize the ...
void ResetImpl(dispatch_tag< ModuleDataBase >) override final
Refer to DynExp::ModuleDataBase::Reset(). Using tag dispatch mechanism to ensure that ResetImpl() of ...
void OnInit(DynExp::ModuleInstance *Instance) const override final
This event is triggered right before the module thread starts. Override it to lock instruments this m...
size_t NumFailedUpdateAttempts
Counts how often StreamManipulator::ModuleMainLoop() contiguously failed due to an exception of type ...
std::chrono::time_point< std::chrono::system_clock > LastManipulatorPyFuncExecution
Time point when the on_step() Python function was invoked last.
std::atomic< bool > IsInitialized
Indicates whether the module has been fully initialized by finishing OnInit(). The variable is atomic...
PyFuncType ManipulatorPyFuncStep
Handle to a Python function called for each manipulation step.
PyStreamManipulatorOutputData ManipulatorPyFuncOutput
Output data returned from the on_step() Python function.
Util::DynExpErrorCodes::DynExpErrorCodes ModuleMainLoop(DynExp::ModuleInstance &Instance) override final
Module main loop. The function is executed periodically by the module thread. Also refer to GetMainLo...
PyFuncType ManipulatorPyFuncExit
Handle to a Python function called on module termination.
void ResetImpl(dispatch_tag< ModuleBase >) override final
Refer to DynExp::Object::Reset(). Using tag dispatch mechanism to ensure that ResetImpl() of every de...
PyFuncType ManipulatorPyFuncInit
Handle to a Python function called on module initialization.
void OnExit(DynExp::ModuleInstance *Instance) const override final
This event is triggered right before the module thread terminates (not due to an exception,...
std::string ManipulatorPyFuncName
Unique name of the Python function all the code of this StreamManipulator instance is declared in....
PyStreamManipulatorInputData ManipulatorPyFuncInput
Input data passed to the on_step() Python function.
void Step(Util::SynchronizedPointer< ModuleDataType > &ModuleData)
Performs a single manipulation step by preparing input data for a call to the Python function on_step...
const std::unique_ptr< ModuleDataType > ModuleData
Module data belonging to this ModuleBase instance.
Definition Module.h:743
Refer to ParamsBase::dispatch_tag.
Definition Module.h:189
Defines data for a thread belonging to a ModuleBase instance. Refer to RunnableInstance.
Definition Module.h:793
const ModuleBase::ModuleDataGetterType ModuleDataGetter
Getter for module's data. Refer to ModuleBase::ModuleDataGetterType.
Definition Module.h:825
ItemIDType GetID() const noexcept
Returns the ID of this Object instance. Thread-safe since ID is const.
Definition Object.h:2143
auto GetObjectName(const std::chrono::milliseconds Timeout=GetParamsTimeoutDefault) const
Returns the name of this Object instance.
Definition Object.h:2128
Refer to ParamsBase::dispatch_tag.
Definition Object.h:2018
const Object::ParamsGetterType ParamsGetter
Invoke to obtain the parameters (derived from ParamsBase) of Owner.
Definition Object.h:3671
void UnlockObject(LinkedObjectWrapperContainer< ObjectT > &ObjectWrapperContainer)
Unlocks an Object instance stored in the LinkedObjectWrapperContainer ObjectWrapperContainer....
Definition Object.h:3570
void LockObject(const ParamsBase::Param< ObjectLink< ObjectT > > &LinkParam, LinkedObjectWrapperContainer< ObjectT > &ObjectWrapperContainer, std::chrono::milliseconds Timeout=ObjectLinkBase::LockObjectTimeoutDefault)
Locks an Object instance referenced by a parameter LinkParam of type ParamsBase::Param< ObjectLink< O...
Definition Object.h:3554
const auto & GetOwner() const noexcept
Returns Owner.
Definition Object.h:3524
Logs events like errors and writes them immediately to a HTML file in a human-readable format....
Definition Util.h:1061
void Log(const std::string &Message, const ErrorType Type=ErrorType::Info, const size_t Line=0, const std::string &Function="", const std::string &File="", const int ErrorCode=0, const std::stacktrace &Trace={}) noexcept
Logs an event from information specified manually.
Definition Util.cpp:309
void Reset()
Removes the owned pybind11::object after locking the GIL.
Definition PyUtil.h:113
Pointer to lock a class derived from ISynchronizedPointerLockable for synchronizing between threads....
Definition Util.h:170
Thrown when an operation timed out before it could be completed, especially used for locking shared d...
Definition Exception.h:261
DynExp's module namespace contains the implementation of DynExp modules which extend DynExp's core fu...
std::vector< DynExpInstr::PyDataStreamInstrument > PyStreamListType
Type of a list of data stream instruments made available to Python.
DynExpErrorCodes
DynExp's error codes
Definition Exception.h:22
std::string ToStr(const T &Value, int Precision=-1)
Converts a (numeric) value of type T to a std::string using operator<< of std::stringstream.
Definition Util.h:625
static constexpr auto PyTab
Character sequence to indent a Python instruction by one level.
Definition PyUtil.h:19
std::string ReadFromFile(const QString &Filename)
Reads the entire content from a text file.
Definition QtUtil.cpp:250
Accumulates include statements to provide a precompiled header.
PyStreamListType OutputStreams
Vector of data stream instruments to write the resulting output samples to.
PyStreamListType InputStreams
Vector of data stream instruments with input samples to be manipulated.
void Reset()
Resets all member variables of this PyStreamManipulatorInputData instance back to their default value...
DynExp::ItemIDType ModuleID
ID of the module invoking the on_step() Python function for stream manipulation.
std::chrono::time_point< std::chrono::system_clock > LastExecutionTime
Time point when the on_step() Python function was invoked last.
Output data type returned from on_step() Python function.
void Reset()
Resets all member variables of this PyStreamManipulatorOutputData instance back to their default valu...
std::chrono::milliseconds MinNextExecutionDelay
Time wo wait minimally before the on_step() Python function is called again when each input data stre...
std::chrono::milliseconds MaxNextExecutionDelay
Time to wait maximally before the on_step() Python function is called again when the input data strea...
std::vector< size_t > LastConsumedSampleIDsPerInputStream
Maintaining the order of input data streams in PyStreamManipulatorInputData::InputStreams as passed t...