Highly flexible laboratory automation for dynamically changing experiments.
1 // This file is part of DynExp.
8 #pragma once
10 #include "stdafx.h"
11 #include "DynExpCore.h"
12 #include "../HardwareAdapters/HardwareAdaptergRPC.h"
13 #include "../MetaInstruments/gRPCInstrument.h"
15 namespace DynExpModule
16 {
17  template <typename... gRPCServices>
18  class gRPCModule;
24  template <typename... gRPCServices>
26  {
27  public:
29  virtual ~gRPCModuleData() = default;
31  private:
35  void ResetImpl(dispatch_tag<ModuleDataBase>) override final { Init(); }
46  void Init() {}
47  };
53  template <typename... gRPCServices>
55  {
56  public:
62  : ModuleParamsBase(ID, Core), NetworkParams(*this) {}
64  virtual ~gRPCModuleParams() = default;
69  virtual const char* GetParamClassTag() const noexcept override { return "gRPCModuleParams"; }
73  private:
87  virtual const DynExp::NetworkParamsExtension* GetNetworkAddressParamsChild() const noexcept override { return &NetworkParams; }
88  };
94  template <typename... gRPCServices>
96  {
97  public:
98  using ObjectType = gRPCModule<gRPCServices...>;
99  using ParamsType = gRPCModuleParams<gRPCServices...>;
102  virtual ~gRPCModuleConfigurator() = default;
103  };
114  template <typename... gRPCServices>
116  {
121  template <typename gRPCService>
122  using ServicePtrType = std::unique_ptr<typename gRPCService::AsyncService>;
124  public:
125  using ParamsType = gRPCModuleParams<gRPCServices...>;
126  using ConfigType = gRPCModuleConfigurator<gRPCServices...>;
127  using ModuleDataType = gRPCModuleData<gRPCServices...>;
129  constexpr static auto Name() noexcept { return "gRPC Module"; }
130  constexpr static auto Category() noexcept { return "Network Modules (Servers)"; }
136  : ModuleBase(OwnerThreadID, std::move(Params)),
139  virtual ~gRPCModule() = default;
141  virtual std::string GetName() const override { return Name(); }
142  virtual std::string GetCategory() const override { return Category(); }
148  bool TreatModuleExceptionsAsWarnings() const override { return ServerRunning; }
155  std::chrono::milliseconds GetMainLoopDelay() const override final { return std::chrono::milliseconds(1); }
161  grpc::ServerCompletionQueue* GetServerQueue() const noexcept { return ServerQueue.get(); }
169  template <size_t Index>
170  auto& GetService() const noexcept { return *std::get<Index>(ServicePtrs); }
179  template <typename T>
180  auto& GetService() const noexcept { return *std::get<ServicePtrType<T>>(ServicePtrs); }
182  protected:
188  {
193  enum class StateType {
194  Init,
195  Process,
196  Exit
197  };
204  protected:
209  CallDataBase(const gRPCModule* const OwningModule) noexcept
212  public:
213  virtual ~CallDataBase() {}
219  auto GetOwningModule() const noexcept { return OwningModule; }
227  void Proceed(DynExp::ModuleInstance& Instance) { StateMachine.Invoke(*this, Instance); }
229  protected:
234  auto* GetServerContext() noexcept { return &ServerContext; }
236  private:
246  virtual void InitChild(DynExp::ModuleInstance& Instance) = 0;
255  virtual void ProcessChild(DynExp::ModuleInstance& Instance) = 0;
264  {
265  InitChild(Instance);
267  return StateType::Process;
268  }
276  {
277  ProcessChild(Instance);
279  return StateType::Exit;
280  }
289  {
290  delete this;
292  return StateType::Exit;
293  }
310  const gRPCModule* const OwningModule;
312  grpc::ServerContext ServerContext;
313  };
326  template <typename DerivedType, typename gRPCService, typename RequestMessageType, typename ResponseMessageType,
327  typename std::enable_if_t<Util::is_contained_in_v<gRPCService, gRPCServices...>, int> = 0>
329  {
330  public:
337  static void MakeCall(const gRPCModule* const OwningModule, DynExp::ModuleInstance& Instance) { (new DerivedType(OwningModule))->Proceed(Instance); }
339  private:
340  friend DerivedType;
347  using ResponseWriterType = grpc::ServerAsyncResponseWriter<ResponseMessageType>;
353  using RequestFuncType = std::function<void(typename gRPCService::AsyncService*, grpc::ServerContext*,
354  RequestMessageType*, ResponseWriterType*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*)>;
362  TypedCallDataBase(const gRPCModule* const OwningModule, const RequestFuncType RequestFunc) noexcept
363  : CallDataBase(OwningModule), RequestFunc(RequestFunc), ResponseWriter(this->GetServerContext()) {}
365  virtual ~TypedCallDataBase() = default;
370  void InitChild(DynExp::ModuleInstance& Instance) override final
371  {
372  // The address of *this* instance serves as the tag to distinguish multiple remote procedure calls.
373  RequestFunc(&this->GetOwningModule()->template GetService<gRPCService>(), this->GetServerContext(),
374  &RequestMessage, &ResponseWriter, this->GetOwningModule()->GetServerQueue(), this->GetOwningModule()->GetServerQueue(), this);
375  }
380  void ProcessChild(DynExp::ModuleInstance& Instance) override final
381  {
382  MakeCall(this->GetOwningModule(), Instance);
384  ProcessChildImpl(Instance);
386  ResponseWriter.Finish(ResponseMessage, grpc::Status::OK, this);
387  }
398  virtual void ProcessChildImpl(DynExp::ModuleInstance& Instance) = 0;
402  RequestMessageType RequestMessage;
403  ResponseMessageType ResponseMessage;
405  };
407  private:
413  auto MakeServicePtrTuple() { return std::make_tuple(std::make_unique<typename ServicePtrType<gRPCServices>::element_type>()...); }
419  {
420  void* Tag;
421  bool IsOK;
423  auto Result = ServerQueue->AsyncNext(&Tag, &IsOK, std::chrono::system_clock::now() + std::chrono::milliseconds(80));
425  if (Result == grpc::CompletionQueue::NextStatus::GOT_EVENT && Tag && IsOK)
426  static_cast<CallDataBase*>(Tag)->Proceed(Instance);
429  }
434  void ResetImpl(dispatch_tag<ModuleBase>) override final
435  {
436  ServerQueue.reset();
437  Server.reset();
440  ServerRunning = false;
443  }
471  virtual void OnInitChild(DynExp::ModuleInstance* Instance) const {}
478  virtual void OnExitChild(DynExp::ModuleInstance* Instance) const {}
491  void OnInit(DynExp::ModuleInstance* Instance) const override final
492  {
493  std::string Address;
494  std::string ObjName;
496  {
497  auto ModuleParams = DynExp::dynamic_Params_cast<gRPCModule>(Instance->ParamsGetter());
498  Address = ModuleParams->NetworkParams.MakeAddress();
499  ObjName = ModuleParams->ObjectName;
500  } // ModuleParams unlocked here.
502  grpc::ServerBuilder ServerBuilder;
503  ServerBuilder.AddListeningPort(Address, grpc::InsecureServerCredentials());
504  std::apply([&ServerBuilder](auto&... ServicePtr) { (ServerBuilder.RegisterService(ServicePtr.get()), ...); }, ServicePtrs);
505  ServerQueue = ServerBuilder.AddCompletionQueue();
506  Server = ServerBuilder.BuildAndStart();
509  OnInitChild(Instance);
511  ServerRunning = true;
512  Util::EventLog().Log("gRPC server \"" + ObjName + "\" (" + GetCategoryAndName() + ") listening on " + Address + ".");
513  }
519  void OnExit(DynExp::ModuleInstance* Instance) const override final
520  {
521  OnExitChild(Instance);
522  Shutdown();
524  Util::EventLog().Log("gRPC server \"" + GetObjectName() + "\" (" + GetCategoryAndName() + ") shut down.");
525  }
531  void OnErrorChild(DynExp::ModuleInstance& Instance) const override final
532  {
533  Shutdown();
534  }
540  void Shutdown() const
541  {
542  if (Server)
543  Server->Shutdown();
544  if (ServerQueue)
545  ServerQueue->Shutdown();
548  }
554  void DrainServerQueue() const
555  {
556  if (!ServerQueue)
557  return;
559  bool Result = true;
560  while (Result)
561  {
562  void* Tag = nullptr;
563  bool IsOK;
564  Result = ServerQueue->Next(&Tag, &IsOK);
566  if (Result && Tag)
567  delete static_cast<CallDataBase*>(Tag);
568  }
569  }
574  mutable std::unique_ptr<grpc::ServerCompletionQueue> ServerQueue;
579  mutable std::unique_ptr<grpc::Server> Server;
584  std::tuple<ServicePtrType<gRPCServices>...> ServicePtrs;
589  mutable std::atomic<bool> ServerRunning;
590  };
591 }
Accumulates include statements to provide a precompiled header.