openMSX
EventDistributor.cc
Go to the documentation of this file.
1 #include "EventDistributor.hh"
2 #include "EventListener.hh"
3 #include "Reactor.hh"
4 #include "RTScheduler.hh"
5 #include "Interpreter.hh"
6 #include "InputEventGenerator.hh"
7 #include "Thread.hh"
8 #include "ranges.hh"
9 #include "stl.hh"
10 #include "view.hh"
11 #include <cassert>
12 #include <chrono>
13 
14 using std::string;
15 
16 namespace openmsx {
17 
19  : reactor(reactor_)
20 {
21 }
22 
24  EventType type, EventListener& listener, Priority priority)
25 {
26  std::lock_guard<std::mutex> lock(mutex);
27  auto& priorityMap = listeners[type];
28  // a listener may only be registered once for each type
29  assert(!contains(view::values(priorityMap), &listener));
30  // insert at highest position that keeps listeners sorted on priority
31  auto it = ranges::upper_bound(priorityMap, priority, LessTupleElement<0>());
32  priorityMap.insert(it, {priority, &listener});
33 }
34 
36  EventType type, EventListener& listener)
37 {
38  std::lock_guard<std::mutex> lock(mutex);
39  auto& priorityMap = listeners[type];
40  priorityMap.erase(rfind_if_unguarded(priorityMap,
41  [&](auto& v) { return v.second == &listener; }));
42 }
43 
45 {
46  // TODO: Implement a real solution against modifying data structure while
47  // iterating through it.
48  // For example, assign nullptr first and then iterate again after
49  // delivering events to remove the nullptr values.
50  // TODO: Is it useful to test for 0 listeners or should we just always
51  // queue the event?
52  assert(event);
53  std::unique_lock<std::mutex> lock(mutex);
54  if (!listeners[event->getType()].empty()) {
55  scheduledEvents.push_back(event);
56  // must release lock, otherwise there's a deadlock:
57  // thread 1: Reactor::deleteMotherBoard()
58  // EventDistributor::unregisterEventListener()
59  // thread 2: EventDistributor::distributeEvent()
60  // Reactor::enterMainLoop()
61  condition.notify_all();
62  lock.unlock();
63  reactor.enterMainLoop();
64  }
65 }
66 
67 bool EventDistributor::isRegistered(EventType type, EventListener* listener) const
68 {
69  return contains(view::values(listeners[type]), listener);
70 }
71 
73 {
74  static PriorityMap priorityMapCopy; // static to preserve capacity
75  static EventQueue eventsCopy; // static to preserve capacity
76 
77  assert(Thread::isMainThread());
78 
79  reactor.getInputEventGenerator().poll();
80  reactor.getInterpreter().poll();
81  reactor.getRTScheduler().execute();
82 
83  std::unique_lock<std::mutex> lock(mutex);
84  // It's possible that executing an event triggers scheduling of another
85  // event. We also want to execute those secondary events. That's why
86  // we have this while loop here.
87  // For example the 'loadstate' command event, triggers a machine switch
88  // event and as reaction to the latter event, AfterCommand will
89  // unsubscribe from the ols MSXEventDistributor. This really should be
90  // done before we exit this method.
91  while (!scheduledEvents.empty()) {
92  assert(eventsCopy.empty());
93  swap(eventsCopy, scheduledEvents);
94  for (auto& event : eventsCopy) {
95  auto type = event->getType();
96  priorityMapCopy = listeners[type];
97  lock.unlock();
98  int blockPriority = Priority::LOWEST; // allow all
99  for (const auto& [priority, listener] : priorityMapCopy) {
100  // It's possible delivery to one of the previous
101  // Listeners unregistered the current Listener.
102  if (!isRegistered(type, listener)) continue;
103 
104  if (priority >= blockPriority) break;
105 
106  if (int block = listener->signalEvent(event)) {
107  assert(block > priority);
108  blockPriority = block;
109  }
110  }
111  lock.lock();
112  }
113  eventsCopy.clear();
114  }
115 }
116 
117 bool EventDistributor::sleep(unsigned us)
118 {
119  std::chrono::microseconds duration(us);
120  std::unique_lock<std::mutex> lock(cvMutex);
121  return condition.wait_for(lock, duration) == std::cv_status::timeout;
122 }
123 
124 } // namespace openmsx
void unregisterEventListener(EventType type, EventListener &listener)
Unregisters a previously registered event listener.
EventDistributor(Reactor &reactor)
std::shared_ptr< const Event > EventPtr
bool sleep(unsigned us)
Sleep for the specified amount of time, but return early when (another thread) called the distributeE...
void registerEventListener(EventType type, EventListener &listener, Priority priority=OTHER)
Registers a given object to receive certain events.
void deliverEvents()
This actually delivers the events.
Priority
Priorities from high to low, higher priority listeners can block events for lower priority listeners.
void distributeEvent(const EventPtr &event)
Schedule the given event for delivery.
virtual int signalEvent(const std::shared_ptr< const Event > &event) noexcept=0
This method gets called when an event you are subscribed to occurs.
void execute()
Execute all expired RTSchedulables.
Definition: RTScheduler.hh:23
Contains the main loop of openMSX.
Definition: Reactor.hh:67
void enterMainLoop()
Definition: Reactor.cc:496
RTScheduler & getRTScheduler()
Definition: Reactor.hh:80
InputEventGenerator & getInputEventGenerator()
Definition: Reactor.hh:84
Interpreter & getInterpreter()
Definition: Reactor.cc:320
bool isMainThread()
Returns true when called from the main thread.
Definition: Thread.cc:15
This file implemented 3 utility functions:
Definition: Autofire.cc:9
EventType
Definition: Event.hh:11
auto upper_bound(ForwardRange &&range, const T &value)
Definition: ranges.hh:83
constexpr auto values(Map &&map)
Definition: view.hh:317
constexpr bool contains(ITER first, ITER last, const VAL &val)
Check if a range contains a given value, using linear search.
Definition: stl.hh:92
constexpr auto rfind_if_unguarded(RANGE &range, PRED pred)
Definition: stl.hh:165