14 #include "NetworkDataStreamInstrument.pb.h"
15 #include "NetworkDataStreamInstrument.grpc.pb.h"
19 template <
typename BaseInstr,
typename std::enable_if_t<std::is_base_of_v<DataStreamInstrument, BaseInstr>,
int>,
typename... gRPCStubs>
20 class NetworkDataStreamInstrumentT;
35 default:
throw Util::InvalidDataException(
"The given unit does not exist in the DataStreamInstrumentData::UnitType enumeration. Did you forget to adjust the UnitType enumeration in class \"DataStreamInstrumentData\"?");
50 default:
throw Util::InvalidDataException(
"The given unit does not exist in the DynExpProto::Common::UnitType enumeration. Did you forget to adjust the UnitType enumeration in file \"Common.proto\"?");
54 namespace NetworkDataStreamInstrumentTasks
56 template <
typename BaseInstr,
typename std::enable_if_t<std::is_base_of_v<DataStreamInstrument, BaseInstr>,
int>,
typename... gRPCStubs>
65 StubPtr = InstrData->template GetStub<DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument>();
68 auto Response =
InvokeStubFunc(StubPtr, &DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument::Stub::GetStreamInfo, {});
74 InstrData->RemoteStreamInfo.HardwareMinValue = Response.hardwareminvalue();
75 InstrData->RemoteStreamInfo.HardwareMaxValue = Response.hardwaremaxvalue();
76 InstrData->RemoteStreamInfo.IsBasicSampleTimeUsed = Response.isbasicsampletimeused();
77 InstrData->RemoteStreamInfo.StreamSizeRead = Util::NumToT<size_t>(Response.streamsizemsg().streamsizeread());
78 InstrData->RemoteStreamInfo.StreamSizeWrite = Util::NumToT<size_t>(Response.streamsizemsg().streamsizewrite());
80 InstrData->GetSampleStream()->SetStreamSize(InstrData->RemoteStreamInfo.StreamSizeWrite);
89 template <
typename BaseInstr,
typename std::enable_if_t<std::is_base_of_v<DataStreamInstrument, BaseInstr>,
int>,
typename... gRPCStubs>
100 template <
typename BaseInstr,
typename std::enable_if_t<std::is_base_of_v<DataStreamInstrument, BaseInstr>,
int>,
typename... gRPCStubs>
108 StubPtr = InstrData->template GetStub<DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument>();
111 auto StreamSizeResponse =
InvokeStubFunc(StubPtr, &DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument::Stub::GetStreamSize, {});
112 auto FinishedResponse =
InvokeStubFunc(StubPtr, &DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument::Stub::HasFinished, {});
113 auto RunningResponse =
InvokeStubFunc(StubPtr, &DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument::Stub::IsRunning, {});
118 InstrData->RemoteStreamInfo.StreamSizeRead = Util::NumToT<size_t>(StreamSizeResponse.streamsizeread());
119 InstrData->RemoteStreamInfo.StreamSizeWrite = Util::NumToT<size_t>(StreamSizeResponse.streamsizewrite());
123 if (InstrData->GetSampleStream()->GetStreamSizeWrite() != InstrData->RemoteStreamInfo.StreamSizeWrite)
125 InstrData->GetSampleStream()->SetStreamSize(InstrData->RemoteStreamInfo.StreamSizeWrite);
126 InstrData->SetLastReadRemoteSampleID(0);
136 template <
typename BaseInstr,
typename std::enable_if_t<std::is_base_of_v<DataStreamInstrument, BaseInstr>,
int>,
typename... gRPCStubs>
145 DynExpProto::NetworkDataStreamInstrument::ReadMessage ReadMsg;
150 StubPtr = InstrData->template GetStub<DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument>();
151 ReadMsg.set_startsampleid(Util::NumToT<google::protobuf::uint64>(InstrData->GetLastReadRemoteSampleID()));
154 auto ReadResultMsg =
InvokeStubFunc(StubPtr, &DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument::Stub::Read, ReadMsg);
157 for (decltype(ReadResultMsg.samples_size()) i = 0; i < ReadResultMsg.samples_size(); ++i)
158 InstrData->GetSampleStream()->WriteBasicSample({ ReadResultMsg.samples(i).value(), ReadResultMsg.samples(i).time() });
160 InstrData->SetLastReadRemoteSampleID(Util::NumToT<size_t>(ReadResultMsg.lastsampleid()));
166 template <
typename BaseInstr,
typename std::enable_if_t<std::is_base_of_v<DataStreamInstrument, BaseInstr>,
int>,
typename... gRPCStubs>
176 std::vector<NetworkDataStreamInstrumentDataSampleStreamType::SampleType> Samples;
179 auto SampleStream = InstrData->template GetCastSampleStream<NetworkDataStreamInstrumentDataSampleStreamType>();
181 if (SampleStream->GetNumSamplesWritten() == InstrData->GetLastWrittenSampleID())
183 if (SampleStream->GetNumSamplesWritten() < InstrData->GetLastWrittenSampleID())
184 InstrData->SetLastWrittenSampleID(0);
186 StubPtr = InstrData->template GetStub<DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument>();
188 Samples = SampleStream->ReadRecentBasicSamples(InstrData->GetLastWrittenSampleID());
189 InstrData->SetLastWrittenSampleID(SampleStream->GetNumSamplesWritten());
192 DynExpProto::NetworkDataStreamInstrument::WriteMessage WriteMsg;
193 for (
const auto& Sample : Samples)
195 auto BasicSampleMsg = WriteMsg.add_samples();
196 BasicSampleMsg->set_value(Sample.Value);
197 BasicSampleMsg->set_time(Sample.Time);
200 InvokeStubFunc(StubPtr, &DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument::Stub::Write, WriteMsg);
206 template <
typename BaseInstr,
typename std::enable_if_t<std::is_base_of_v<DataStreamInstrument, BaseInstr>,
int>,
typename... gRPCStubs>
218 StubPtr = InstrData->template GetStub<DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument>();
221 InvokeStubFunc(StubPtr, &DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument::Stub::ClearData, {});
227 template <
typename BaseInstr,
typename std::enable_if_t<std::is_base_of_v<DataStreamInstrument, BaseInstr>,
int>,
typename... gRPCStubs>
239 StubPtr = InstrData->template GetStub<DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument>();
242 InvokeStubFunc(StubPtr, &DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument::Stub::Start, {});
248 template <
typename BaseInstr,
typename std::enable_if_t<std::is_base_of_v<DataStreamInstrument, BaseInstr>,
int>,
typename... gRPCStubs>
260 StubPtr = InstrData->template GetStub<DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument>();
263 InvokeStubFunc(StubPtr, &DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument::Stub::Stop, {});
269 template <
typename BaseInstr,
typename std::enable_if_t<std::is_base_of_v<DataStreamInstrument, BaseInstr>,
int>,
typename... gRPCStubs>
281 StubPtr = InstrData->template GetStub<DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument>();
284 InvokeStubFunc(StubPtr, &DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument::Stub::Restart, {});
290 template <
typename BaseInstr,
typename std::enable_if_t<std::is_base_of_v<DataStreamInstrument, BaseInstr>,
int>,
typename... gRPCStubs>
300 DynExpProto::NetworkDataStreamInstrument::StreamSizeMessage StreamSizeMsg;
301 StreamSizeMsg.set_streamsizewrite(Util::NumToT<google::protobuf::uint64>(
StreamSizeInSamples));
306 StubPtr = InstrData->template GetStub<DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument>();
309 auto Response =
InvokeStubFunc(StubPtr, &DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument::Stub::SetStreamSize, StreamSizeMsg);
313 if (InstrData->GetSampleStream()->GetStreamSizeWrite() != Response.streamsizewrite())
315 InstrData->GetSampleStream()->
SetStreamSize(Response.streamsizewrite());
316 InstrData->SetLastReadRemoteSampleID(0);
326 template <
typename BaseInstr,
typename std::enable_if_t<std::is_base_of_v<DataStreamInstrument, BaseInstr>,
int>,
typename... gRPCStubs>
338 StubPtr = InstrData->template GetStub<DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument>();
341 auto Response =
InvokeStubFunc(StubPtr, &DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument::Stub::ResetStreamSize, {});
345 if (InstrData->GetSampleStream()->GetStreamSizeWrite() != Response.streamsizewrite())
347 InstrData->GetSampleStream()->
SetStreamSize(Response.streamsizewrite());
348 InstrData->SetLastReadRemoteSampleID(0);
357 template <
typename BaseInstr,
typename std::enable_if_t<std::is_base_of_v<DataStreamInstrument, BaseInstr>,
int>,
typename... gRPCStubs>
413 template <
typename BaseInstr,
typename std::enable_if_t<std::is_base_of_v<DataStreamInstrument, BaseInstr>,
int>,
typename... gRPCStubs>
420 virtual const char*
GetParamClassTag() const noexcept
override {
return "NetworkDataStreamInstrumentParams"; }
429 template <
typename BaseInstr,
typename std::enable_if_t<std::is_base_of_v<DataStreamInstrument, BaseInstr>,
int>,
typename... gRPCStubs>
449 template <
typename BaseInstr,
typename std::enable_if_t<std::is_base_of_v<DataStreamInstrument, BaseInstr>,
int>,
typename... gRPCStubs>
457 constexpr
static auto Name() noexcept {
return "Network Data Stream Instrument"; }
460 :
gRPCInstrument<BaseInstr, 0, gRPCStubs...>(OwnerThreadID, std::move(Params)) {}
469 virtual std::chrono::milliseconds
GetTaskQueueDelay()
const {
return std::chrono::milliseconds(500); }
473 auto InstrData = dynamic_InstrumentData_cast<NetworkDataStreamInstrumentT>(this->GetInstrumentData());
474 return InstrData->HasFinished();
479 auto InstrData = dynamic_InstrumentData_cast<NetworkDataStreamInstrumentT>(this->GetInstrumentData());
480 return InstrData->IsRunning();
521 auto InstrData = dynamic_InstrumentData_cast<NetworkDataStreamInstrumentT>(this->GetInstrumentData());
522 return InstrData->GetValueUnit();
Implementation of a data stream meta instrument and of data streams input/output devices might work o...
Implements a circular data stream based on Util::circularbuf using samples of type BasicSample.
UnitType
Units which can be used for data stream instruments.
@ Ampere
Electric current in Ampere (A)
@ Arbitrary
Arbitrary units (a.u.)
@ LogicLevel
Logic level (TTL) units (1 or 0)
@ Power_W
Power in Watt (W)
@ Volt
Voltage in Volt (V)
@ Counts
Count rate in counts per second (cps)
Implementation of the data stream meta instrument, which is a base class for all instruments reading/...
virtual DynExp::ParamsBasePtrType MakeParams(DynExp::ItemIDType ID, const DynExp::DynExpCore &Core) const override
Override to make derived classes call DynExp::MakeParams with the correct configurator type derived f...
virtual ~NetworkDataStreamInstrumentConfigurator()=default
NetworkDataStreamInstrumentConfigurator()=default
void ResetImpl(DynExp::InstrumentDataBase::dispatch_tag< gRPCInstrumentData< BaseInstr, 0, gRPCStubs... >>) override final
Util::OptionalBool Finished
NetworkDataStreamInstrumentData(size_t BufferSizeInSamples=1)
size_t LastReadRemoteSampleID
ID of the last sample read from the remote site and written to the assigned data stream.
RemoteStreamInfoType RemoteStreamInfo
size_t LastWrittenSampleID
ID of the last sample read from the assigned data stream and written to the remote site.
void SetLastWrittenSampleID(size_t SampleID) noexcept
virtual void ResetImpl(DynExp::InstrumentDataBase::dispatch_tag< NetworkDataStreamInstrumentData >)
virtual ~NetworkDataStreamInstrumentData()=default
auto HasFinished() const noexcept
void SetLastReadRemoteSampleID(size_t SampleID) noexcept
Util::OptionalBool Running
DataStreamInstrumentData::UnitType ValueUnit
auto GetLastReadRemoteSampleID() const noexcept
bool IsBasicSampleTimeUsed
auto IsRunning() const noexcept
const auto & GetRemoteStreamInfo() const noexcept
auto GetLastWrittenSampleID() const noexcept
void ConfigureParamsImpl(DynExp::ParamsBase::dispatch_tag< gRPCInstrumentParams< BaseInstr, 0, gRPCStubs... >>) override final
DynExp::ParamsBase::DummyParam Dummy
virtual ~NetworkDataStreamInstrumentParams()=default
virtual const char * GetParamClassTag() const noexcept override
This function is intended to be overridden once in each derived class returning the name of the respe...
NetworkDataStreamInstrumentParams(DynExp::ItemIDType ID, const DynExp::DynExpCore &Core)
virtual void ConfigureParamsImpl(DynExp::ParamsBase::dispatch_tag< NetworkDataStreamInstrumentParams >)
Data stream instrument for bidirectional gRPC communication.
virtual std::unique_ptr< DynExp::UpdateTaskBase > MakeUpdateTask() const override
Factory function for an update task (UpdateTaskBase). Override to define the desired update task in d...
virtual std::string GetName() const override
Returns the name of this Object type.
NetworkDataStreamInstrumentT(const std::thread::id OwnerThreadID, DynExp::ParamsBasePtrType &&Params)
virtual void ClearData(DynExp::TaskBase::CallbackType CallbackFunc=nullptr) const override
virtual void Stop(DynExp::TaskBase::CallbackType CallbackFunc=nullptr) const override
constexpr static auto Name() noexcept
virtual Util::OptionalBool IsRunning() const override
virtual ~NetworkDataStreamInstrumentT()
virtual std::unique_ptr< DynExp::InitTaskBase > MakeInitTask() const override
Factory function for an init task (InitTaskBase). Override to define the desired initialization task ...
virtual Util::OptionalBool HasFinished() const override
virtual void Start(DynExp::TaskBase::CallbackType CallbackFunc=nullptr) const override
virtual void SetStreamSize(size_t BufferSizeInSamples, DynExp::TaskBase::CallbackType CallbackFunc=nullptr) const override
virtual void ResetStreamSize(DynExp::TaskBase::CallbackType CallbackFunc=nullptr) const override
void ResetImpl(DynExp::Object::dispatch_tag< gRPCInstrument< BaseInstr, 0, gRPCStubs... >>) override final
virtual void ResetImpl(DynExp::Object::dispatch_tag< NetworkDataStreamInstrumentT >)
virtual std::chrono::milliseconds GetTaskQueueDelay() const
Read remote instrument's state periodically.
virtual void WriteData(DynExp::TaskBase::CallbackType CallbackFunc=nullptr) const override
virtual void Restart(DynExp::TaskBase::CallbackType CallbackFunc=nullptr) const override
virtual void ReadData(DynExp::TaskBase::CallbackType CallbackFunc=nullptr) const override
virtual std::unique_ptr< DynExp::ExitTaskBase > MakeExitTask() const override
Factory function for an exit task (ExitTaskBase). Override to define the desired deinitialization tas...
virtual DynExp::TaskResultType RunChild(DynExp::InstrumentInstance &Instance) override
Runs the task. Override RunChild() to define a derived task's action(s). Any exception leaving RunChi...
ClearTask(CallbackType CallbackFunc) noexcept
virtual void ExitFuncImpl(DynExp::ExitTaskBase::dispatch_tag< ExitTask >, DynExp::InstrumentInstance &Instance)
void ExitFuncImpl(DynExp::ExitTaskBase::dispatch_tag< gRPCInstrumentTasks::ExitTask< BaseInstr, 0, gRPCStubs... >>, DynExp::InstrumentInstance &Instance) override final
void InitFuncImpl(DynExp::InitTaskBase::dispatch_tag< gRPCInstrumentTasks::InitTask< BaseInstr, 0, gRPCStubs... >>, DynExp::InstrumentInstance &Instance) override final
virtual void InitFuncImpl(DynExp::InitTaskBase::dispatch_tag< InitTask >, DynExp::InstrumentInstance &Instance)
ReadTask(CallbackType CallbackFunc) noexcept
virtual DynExp::TaskResultType RunChild(DynExp::InstrumentInstance &Instance) override
Runs the task. Override RunChild() to define a derived task's action(s). Any exception leaving RunChi...
virtual DynExp::TaskResultType RunChild(DynExp::InstrumentInstance &Instance) override
Runs the task. Override RunChild() to define a derived task's action(s). Any exception leaving RunChi...
ResetStreamSizeTask(CallbackType CallbackFunc) noexcept
RestartTask(CallbackType CallbackFunc) noexcept
virtual DynExp::TaskResultType RunChild(DynExp::InstrumentInstance &Instance) override
Runs the task. Override RunChild() to define a derived task's action(s). Any exception leaving RunChi...
const size_t StreamSizeInSamples
virtual DynExp::TaskResultType RunChild(DynExp::InstrumentInstance &Instance) override
Runs the task. Override RunChild() to define a derived task's action(s). Any exception leaving RunChi...
SetStreamSizeTask(size_t StreamSizeInSamples, CallbackType CallbackFunc) noexcept
virtual DynExp::TaskResultType RunChild(DynExp::InstrumentInstance &Instance) override
Runs the task. Override RunChild() to define a derived task's action(s). Any exception leaving RunChi...
StartTask(CallbackType CallbackFunc) noexcept
virtual DynExp::TaskResultType RunChild(DynExp::InstrumentInstance &Instance) override
Runs the task. Override RunChild() to define a derived task's action(s). Any exception leaving RunChi...
StopTask(CallbackType CallbackFunc) noexcept
void UpdateFuncImpl(DynExp::UpdateTaskBase::dispatch_tag< gRPCInstrumentTasks::UpdateTask< BaseInstr, 0, gRPCStubs... >>, DynExp::InstrumentInstance &Instance) override final
virtual void UpdateFuncImpl(DynExp::UpdateTaskBase::dispatch_tag< UpdateTask >, DynExp::InstrumentInstance &Instance)
virtual DynExp::TaskResultType RunChild(DynExp::InstrumentInstance &Instance) override
Runs the task. Override RunChild() to define a derived task's action(s). Any exception leaving RunChi...
WriteTask(CallbackType CallbackFunc) noexcept
Explicit instantiation of derivable class NetworkDataStreamInstrumentT to create the network data str...
DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument StubType
virtual DataStreamInstrumentData::UnitType GetValueUnit() const override
virtual ~NetworkDataStreamInstrument()
NetworkDataStreamInstrument(const std::thread::id OwnerThreadID, DynExp::ParamsBasePtrType &&Params)
Configurator class for gRPCInstrument.
Data class for gRPCInstrument.
Parameter class for gRPCInstrument.
Defines a task for deinitializing an instrument within an instrument inheritance hierarchy....
Defines a task for initializing an instrument within an instrument inheritance hierarchy....
Defines a task for updating an instrument within an instrument inheritance hierarchy....
Meta instrument template for transforming meta instruments into network instruments,...
DynExp's core class acts as the interface between the user interface and DynExp's internal data like ...
Refer to DynExp::ParamsBase::dispatch_tag.
Refer to DynExp::ParamsBase::dispatch_tag.
void MakeAndEnqueueTask(ArgTs &&...Args) const
Calls MakeTask() to construct a new task and subsequently enqueues the task into the instrument's tas...
Refer to ParamsBase::dispatch_tag.
Defines data for a thread belonging to a InstrumentBase instance. Refer to RunnableInstance.
const InstrumentBase::InstrumentDataGetterType InstrumentDataGetter
Getter for instrument's data. Refer to InstrumentBase::InstrumentDataGetterType.
Refer to ParamsBase::dispatch_tag.
Dummy parameter which is to be owned once by parameter classes that do not contain any other paramete...
Tag for function dispatching mechanism within this class used when derived classes are not intended t...
Base class for all tasks being processed by instruments. The class must not contain public virtual fu...
std::function< void(const TaskBase &, ExceptionContainer &)> CallbackType
Type of a callback function which is invoked when a task has finished, failed or has been aborted....
TaskBase(CallbackType CallbackFunc=nullptr) noexcept
Constructs an instrument task.
const CallbackType CallbackFunc
This callback function is called after the task has finished (either successfully or not) with a refe...
Defines the return type of task functions.
Refer to DynExp::ParamsBase::dispatch_tag.
Data to operate on is invalid for a specific purpose. This indicates a corrupted data structure or fu...
Data type which stores an optional bool value (unknown, false, true). The type evaluates to bool whil...
Defines a meta instrument template for transforming meta instruments into network instruments,...
DynExp's instrument namespace contains the implementation of DynExp instruments which extend DynExp's...
constexpr DynExpProto::Common::UnitType ToPrototUnitType(DataStreamInstrumentData::UnitType Unit)
BasicSampleStream NetworkDataStreamInstrumentDataSampleStreamType
ResponseMsgType InvokeStubFunc(StubPtrType< gRPCStub > StubPtr, StubFuncPtrType< gRPCStub, RequestMsgType, ResponseMsgType > StubFunc, const RequestMsgType &RequestMsg)
Invokes a gRPC stub function as a remote procedure call. Waits for a fixed amount of time (2 seconds)...
std::shared_ptr< typename gRPCStub::Stub > StubPtrType
Alias for a pointer to a gRPC stub.
constexpr DataStreamInstrumentData::UnitType ToDataStreamInstrumentUnitType(DynExpProto::Common::UnitType Unit)
NetworkDataStreamInstrumentT< DynExpProto::NetworkDataStreamInstrument::NetworkDataStreamInstrument > NetworkDataStreamInstrument
Explicit instantiation of derivable class NetworkDataStreamInstrumentT to create the network data str...
std::unique_ptr< TaskT > MakeTask(ArgTs &&...Args)
Factory function to create a task to be enqueued in an instrument's task queue.
auto dynamic_InstrumentData_cast(Util::SynchronizedPointer< From > &&InstrumentDataPtr)
Casts the data base class From into a derived InstrumentBase's (To) data class keeping the data locke...
std::unique_ptr< ParamsBase > ParamsBasePtrType
Alias for a pointer to the parameter system base class ParamsBase.
size_t ItemIDType
ID type of objects/items managed by DynExp.
Accumulates include statements to provide a precompiled header.