Highly flexible laboratory automation for dynamically changing experiments.
1 // This file is part of DynExp.
9 #pragma once
11 #include "stdafx.h"
12 #include "Instrument.h"
14 namespace DynExpInstr
15 {
16  class DataStreamInstrument;
21  namespace DataStreamInstrumentTasks
22  {
27  {
34  };
40  {
47  };
53  {
60  };
65  class SetStreamSizeTask final : public DynExp::TaskBase
66  {
67  public:
75  private:
78  const size_t BufferSizeInSamples;
79  };
80  }
85  struct BasicSample
86  {
87  using DataType = double;
92  constexpr BasicSample() noexcept : Value(0), Time(0) {}
98  constexpr BasicSample(DataType Value) noexcept : Value(Value), Time(0) {}
105  constexpr BasicSample(DataType Value, DataType Time) noexcept : Value(Value), Time(Time) {}
109  };
116  {
117  public:
121  using BasicSampleListType = std::vector<BasicSample>;
123  virtual ~DataStreamBase() = default;
135  virtual bool IsBasicSampleConvertible() const noexcept { return false; }
144  virtual bool IsBasicSampleTimeUsed() const noexcept { return false; }
151  virtual void SeekBeg(std::ios_base::openmode Which = std::ios_base::in | std::ios_base::out) = 0;
159  virtual void SeekEnd(std::ios_base::openmode Which = std::ios_base::in | std::ios_base::out) = 0;
167  virtual bool SeekEqual(std::ios_base::openmode Which = std::ios_base::in | std::ios_base::out) = 0;
173  virtual size_t GetStreamSizeRead() const noexcept = 0;
179  virtual size_t GetStreamSizeWrite() const noexcept = 0;
187  virtual size_t GetNumSamplesWritten() const noexcept = 0;
193  virtual void SetStreamSize(size_t BufferSizeInSamples) = 0;
201  bool CanRead() const { return GetStreamSizeRead(); }
206  void Clear() { ClearChild(); }
212  void WriteBasicSample(const BasicSample& Sample) { WriteBasicSampleChild(Sample); }
224  void WriteBasicSamples(const BasicSampleListType& Samples);
232  private:
242  virtual void WriteBasicSampleChild(const BasicSample& Sample);
251  virtual void ClearChild() = 0;
253  };
258  template <typename T>
259  using DataStreamPtrType = std::unique_ptr<T>;
270  {
271  protected:
273  virtual ~CircularDataStreamBase() = default;
275  public:
285  virtual size_t GetNumAvailableSamplesToReadTillEnd() const noexcept = 0;
292  virtual size_t GetNumFreeSamplesToWrite() const noexcept = 0;
299  virtual std::streampos GetReadPosition() const noexcept = 0;
306  virtual std::streampos GetWritePosition() const noexcept = 0;
317  virtual bool SeekRel(signed long long OffsetInSamples, std::ios_base::seekdir SeekDir,
318  std::ios_base::openmode Which = std::ios_base::in | std::ios_base::out) = 0;
327  virtual bool SeekAbs(unsigned long long PositionInSamples, std::ios_base::openmode Which = std::ios_base::in | std::ios_base::out) = 0;
339  size_t GetNumRecentBasicSamples(size_t Count) const;
348  };
354  template <
355  typename SampleT,
356  std::enable_if_t<std::is_trivially_copyable_v<SampleT>, int> = 0
357  >
359  {
360  public:
361  using SampleType = SampleT;
367  CircularDataStream(size_t BufferSizeInSamples)
368  : StreamBuffer(sizeof(SampleT) * BufferSizeInSamples), Stream(&StreamBuffer), NumSamplesWritten(0)
369  {
370  Stream.exceptions(std::iostream::failbit | std::iostream::badbit);
371  }
373  virtual ~CircularDataStream() = default;
379  constexpr auto GetBytesPerSample() noexcept { return sizeof(SampleT); }
381  size_t GetNumAvailableSamplesToReadTillEnd() const noexcept override { return (StreamBuffer.gsize() - StreamBuffer.gtellp()) / sizeof(SampleT); }
382  size_t GetNumFreeSamplesToWrite() const noexcept override { return (StreamBuffer.psize() - StreamBuffer.ptellp()) / sizeof(SampleT); }
383  std::streampos GetReadPosition() const noexcept override { return StreamBuffer.gtellp() / sizeof(SampleT); }
384  std::streampos GetWritePosition() const noexcept override { return StreamBuffer.ptellp() / sizeof(SampleT); }
386  virtual bool SeekRel(signed long long OffsetInSamples, std::ios_base::seekdir SeekDir,
387  std::ios_base::openmode Which = std::ios_base::in | std::ios_base::out) override
388  {
389  return StreamBuffer.pubseekoff(OffsetInSamples * sizeof(SampleT), SeekDir, Which) !=
390  Util::circularbuf::pos_type(Util::circularbuf::off_type(-1));
391  }
393  virtual bool SeekAbs(unsigned long long PositionInSamples,
394  std::ios_base::openmode Which = std::ios_base::in | std::ios_base::out) override
395  {
396  return StreamBuffer.pubseekpos(PositionInSamples * sizeof(SampleT), Which) !=
397  Util::circularbuf::pos_type(Util::circularbuf::off_type(-1));
398  }
400  virtual void SeekBeg(std::ios_base::openmode Which = std::ios_base::in | std::ios_base::out) override
401  {
402  SeekAbs(0, Which);
403  }
405  virtual void SeekEnd(std::ios_base::openmode Which = std::ios_base::in | std::ios_base::out) override
406  {
407  if (Which & std::ios_base::in)
408  SeekRel(-1, std::ios_base::end, std::ios_base::in);
409  if (Which & std::ios_base::out)
410  SeekRel(0, std::ios_base::end, std::ios_base::out);
411  }
413  virtual bool SeekEqual(std::ios_base::openmode Which = std::ios_base::in | std::ios_base::out) override
414  {
415  if (Which & std::ios_base::in)
416  {
417  StreamBuffer.pubsync();
420  SeekAbs(GetWritePosition(), std::ios_base::in);
421  else
422  return false;
423  }
425  // Read buffer is never larger than write buffer.
426  if (Which & std::ios_base::out)
427  SeekAbs(GetReadPosition(), std::ios_base::out);
429  return true;
430  }
432  virtual size_t GetStreamSizeRead() const noexcept override { return StreamBuffer.gsize() / sizeof(SampleT);}
433  virtual size_t GetStreamSizeWrite() const noexcept override { return StreamBuffer.psize() / sizeof(SampleT); }
434  virtual size_t GetNumSamplesWritten() const noexcept override { return NumSamplesWritten; }
435  virtual void SetStreamSize(size_t BufferSizeInSamples) override { StreamBuffer.resize(sizeof(SampleT) * BufferSizeInSamples); }
445  void WriteSample(const SampleT& Sample)
446  {
447  ValidateSample(Sample);
449  Stream.write(reinterpret_cast<const char*>(&Sample), sizeof(SampleT));
451  // Should never overflow. Let's assume 10 Gb/s = 1.25e9 B/s transmission rate:
452  // Overflows in (2^64 - 1) B / 1.25e9 B/s / 60 / 60 / 24 / 365 = 468 years.
453  if (NumSamplesWritten < std::numeric_limits<decltype(NumSamplesWritten)>::max())
454  ++NumSamplesWritten;
455  }
465  template <typename Rep, typename Period>
466  void WriteSample(const std::chrono::duration<Rep, Period>& Sample)
467  {
468  WriteSample(Sample.count());
469  }
475  SampleT ReadSample()
476  {
477  SampleT Sample;
478  Stream.read(reinterpret_cast<char*>(&Sample), sizeof(SampleT));
480  return Sample;
481  }
488  template <typename T>
489  void WriteSamples(const std::vector<T>& Samples)
490  {
491  for (const auto& Sample : Samples)
492  WriteSample(Sample);
493  }
500  std::vector<SampleT> ReadSamples(size_t Count)
501  {
502  std::vector<SampleT> Samples;
504  for (decltype(Count) i = 0; i < Count; ++i)
505  Samples.push_back(ReadSample());
507  return Samples;
508  }
510  protected:
511  virtual void ClearChild() override
512  {
513  Stream.clear();
514  StreamBuffer.clear();
515  }
517  private:
529  virtual void ValidateSample(const SampleT& Sample) const {}
533  std::iostream Stream;
535  };
541  class BasicSampleStream : public CircularDataStream<BasicSample>
542  {
543  public:
548  BasicSampleStream(size_t BufferSizeInSamples) : CircularDataStream(BufferSizeInSamples) {}
550  virtual ~BasicSampleStream() = default;
552  bool IsBasicSampleConvertible() const noexcept override final { return true; }
553  bool IsBasicSampleTimeUsed() const noexcept override final { return true; }
555  private:
556  virtual void WriteBasicSampleChild(const BasicSample& Sample) override { WriteSample(Sample); }
557  virtual BasicSample ReadBasicSampleChild() override { return ReadSample(); }
558  };
565  template <
566  typename SampleT,
567  std::enable_if_t<std::is_arithmetic_v<SampleT>, int> = 0
568  >
569  class NumericSampleStream : public CircularDataStream<SampleT>
570  {
571  public:
577  NumericSampleStream(size_t BufferSizeInSamples) : CircularDataStream<SampleT>(BufferSizeInSamples),
578  MinValue(std::numeric_limits<SampleT>::lowest()), MaxValue(std::numeric_limits<SampleT>::max()) {}
587  NumericSampleStream(size_t BufferSizeInSamples, SampleT MinValue, SampleT MaxValue)
588  : CircularDataStream<SampleT>(BufferSizeInSamples), MinValue(MinValue), MaxValue(MaxValue) {}
590  virtual ~NumericSampleStream() = default;
592  bool IsBasicSampleConvertible() const noexcept override final { return true; }
599  void SetLimits(SampleT MinValue, SampleT MaxValue)
600  {
601  this->MinValue = MinValue;
602  this->MaxValue = MaxValue;
603  }
609  auto GetMinValue() const noexcept { return MinValue; }
615  auto GetMaxValue() const noexcept { return MaxValue; }
617  private:
623  virtual void WriteBasicSampleChild(const BasicSample& Sample) override
624  {
625  CircularDataStream<SampleT>::WriteSample(static_cast<SampleT>(Sample.Value));
626  }
633  {
634  BasicSample Sample;
636  // Ignore time value (set it to 0). It has been considered (and discarded) to set it to
637  // std::chrono::system_clock::now() instead. This would be misleading since a timestamp
638  // which is set here refers to the time when the sample is read from the buffer - not
639  // to the time when it is actually acquired by the underlying hardware.
640  Sample.Time = 0;
641  Sample.Value = static_cast<decltype(Sample.Value)>(CircularDataStream<SampleT>::ReadSample());
643  return Sample;
644  }
646  virtual void ValidateSample(const SampleT& Sample) const override
647  {
648  if (Sample < MinValue || Sample > MaxValue)
650  "A sample cannot be written to a stream buffer because it exceeds the specified limits (min: " + std::to_string(MinValue)
651  + ", max: " + std::to_string(MaxValue) + ", sample: " + std::to_string(Sample) + ")");
652  }
654  SampleT MinValue;
655  SampleT MaxValue;
656  };
667  {
668  static constexpr ParamsConfigDialog::NumberType DefaultStreamSize = 1000;
670  public:
675  struct ValueType
676  {
677  ParamsConfigDialog::NumberType StreamSize = DefaultStreamSize;
678  };
688  ParamsConfigDialog::NumberType DefaultValue = DefaultStreamSize, ParamsConfigDialog::NumberType MinValue = 1,
689  ParamsConfigDialog::NumberType MaxValue = static_cast<double>(std::numeric_limits<size_t>::max()))
690  : StreamSize{ Owner, "StreamSize", "Stream size",
691  "Size of the instrument's sample stream buffer in samples.", true, DefaultValue, MinValue, MaxValue, 1, 0 }
692  {}
697  void DisableUserEditable();
704  ValueType Values() const { return { StreamSize }; }
711  };
717  {
718  public:
722  enum SamplingModeType { Single, Continuous };
733  private:
734  static constexpr ParamsConfigDialog::NumberType DefaultSamplingRate = 1;
735  static constexpr SamplingModeType DefaultSamplingMode = SamplingModeType::Single;
737  public:
742  struct ValueType
743  {
744  ParamsConfigDialog::NumberType SamplingRate = DefaultSamplingRate;
745  SamplingModeType SamplingMode = DefaultSamplingMode;
746  };
752  static Util::TextValueListType<SamplingModeType> SamplingModeTypeStrList();
759  : SamplingRate{ Owner, "SamplingRate", "Sampling rate",
760  "Sampling rate in samples per second.", true, DefaultSamplingRate, 0, std::numeric_limits<ParamsConfigDialog::NumberType>::max(), 1, 3 },
761  SamplingMode{ Owner, SamplingModeTypeStrList(), "SamplingMode", "Sampling mode",
762  "Determines whether reading/writing happens only once or continuously.", true, DefaultSamplingMode }
763  {}
768  void DisableUserEditable();
775  ValueType Values() const { return { SamplingRate, SamplingMode}; }
788  };
794  {
795  public:
800  using ValueType = double;
807  enum class UnitType {
808  Arbitrary,
809  LogicLevel,
810  Counts,
811  Volt,
812  Ampere,
813  Power_W,
814  Power_dBm
815  };
822  static const char* UnitTypeToStr(const UnitType& Unit);
832  virtual ~DataStreamInstrumentData() = default;
840  DataStreamBasePtrType::element_type* GetSampleStream() const noexcept { return SampleStream.get(); }
848  template <typename T>
849  auto GetCastSampleStream() const
850  {
851  auto CastSampleStream = dynamic_cast<T*>(GetSampleStream());
852  if (!CastSampleStream)
853  throw Util::TypeErrorException();
855  return CastSampleStream;
856  }
858  ValueType GetHardwareMinValue() const noexcept { return HardwareMinValue; }
859  ValueType GetHardwareMaxValue() const noexcept { return HardwareMaxValue; }
860  UnitType GetValueUnit() const noexcept { return ValueUnit; }
861  void SetHardwareMinValue(ValueType Value) noexcept { HardwareMinValue = Value; }
862  void SetHardwareMaxValue(ValueType Value) noexcept { HardwareMaxValue = Value; }
863  void SetValueUnit(UnitType Unit) noexcept { ValueUnit = Unit; }
865  private:
866  void ResetImpl(dispatch_tag<InstrumentDataBase>) override final;
880  };
886  {
887  public:
892  DataStreamInstrumentParams(DynExp::ItemIDType ID, const DynExp::DynExpCore& Core) : InstrumentParamsBase(ID, Core) {}
894  virtual ~DataStreamInstrumentParams() = 0;
896  virtual const char* GetParamClassTag() const noexcept override { return "DataStreamInstrumentParams"; }
898  private:
902  DummyParam Dummy = { *this };
903  };
909  {
910  public:
915  virtual ~DataStreamInstrumentConfigurator() = 0;
916  };
924  {
925  public:
939  constexpr static auto Name() noexcept { return "Data Stream Instrument"; }
940  constexpr static auto Category() noexcept { return "I/O"; }
945  DataStreamInstrument(const std::thread::id OwnerThreadID, DynExp::ParamsBasePtrType&& Params)
946  : InstrumentBase(OwnerThreadID, std::move(Params)) {}
948  virtual ~DataStreamInstrument() = 0;
950  virtual std::string GetName() const override { return Name(); }
951  virtual std::string GetCategory() const override { return Category(); }
972  const char* GetValueUnitStr() const noexcept { return DataStreamInstrumentData::UnitTypeToStr(GetValueUnit()); }
987  virtual void ReadData(DynExp::TaskBase::CallbackType CallbackFunc = nullptr) const;
995  virtual void WriteData(DynExp::TaskBase::CallbackType CallbackFunc = nullptr) const;
1001  virtual void ClearData(DynExp::TaskBase::CallbackType CallbackFunc = nullptr) const {}
1008  virtual void Start(DynExp::TaskBase::CallbackType CallbackFunc = nullptr) const {}
1015  virtual void Stop(DynExp::TaskBase::CallbackType CallbackFunc = nullptr) const {}
1023  virtual void Restart(DynExp::TaskBase::CallbackType CallbackFunc = nullptr) const;
1032  virtual void SetStreamSize(size_t BufferSizeInSamples, DynExp::TaskBase::CallbackType CallbackFunc = nullptr) const;
1039  virtual void ResetStreamSize(DynExp::TaskBase::CallbackType CallbackFunc = nullptr) const {}
1066  bool CanRead(const std::chrono::milliseconds Timeout = GetInstrumentDataTimeoutDefault) const;
1074  void Clear(const std::chrono::milliseconds Timeout = GetInstrumentDataTimeoutDefault) const;
1076  private:
1077  void ResetImpl(dispatch_tag<InstrumentBase>) override final;
1080  virtual std::unique_ptr<DynExp::InitTaskBase> MakeInitTask() const override { return DynExp::MakeTask<DataStreamInstrumentTasks::InitTask>(); }
1081  virtual std::unique_ptr<DynExp::ExitTaskBase> MakeExitTask() const override { return DynExp::MakeTask<DataStreamInstrumentTasks::ExitTask>(); }
1082  virtual std::unique_ptr<DynExp::UpdateTaskBase> MakeUpdateTask() const override { return DynExp::MakeTask<DataStreamInstrumentTasks::UpdateTask>(); }
1083  };
1084 }
