openMSX
EventDistributor.cc
Go to the documentation of this file.
1 #include "EventDistributor.hh"
2 #include "EventListener.hh"
3 #include "Reactor.hh"
4 #include "RTScheduler.hh"
5 #include "Interpreter.hh"
6 #include "InputEventGenerator.hh"
7 #include "Thread.hh"
8 #include "KeyRange.hh"
9 #include "stl.hh"
10 #include <algorithm>
11 #include <cassert>
12 #include <chrono>
13 
14 using std::string;
15 
16 namespace openmsx {
17 
19  : reactor(reactor_)
20 {
21 }
22 
24  EventType type, EventListener& listener, Priority priority)
25 {
26  std::lock_guard<std::mutex> lock(mutex);
27  auto& priorityMap = listeners[type];
28  for (auto* l : values(priorityMap)) {
29  // a listener may only be registered once for each type
30  assert(l != &listener); (void)l;
31  }
32  // insert at highest position that keeps listeners sorted on priority
33  auto it = upper_bound(begin(priorityMap), end(priorityMap), priority,
35  priorityMap.insert(it, {priority, &listener});
36 }
37 
39  EventType type, EventListener& listener)
40 {
41  std::lock_guard<std::mutex> lock(mutex);
42  auto& priorityMap = listeners[type];
43  priorityMap.erase(rfind_if_unguarded(priorityMap,
44  [&](PriorityMap::value_type v) { return v.second == &listener; }));
45 }
46 
48 {
49  // TODO: Implement a real solution against modifying data structure while
50  // iterating through it.
51  // For example, assign nullptr first and then iterate again after
52  // delivering events to remove the nullptr values.
53  // TODO: Is it useful to test for 0 listeners or should we just always
54  // queue the event?
55  assert(event);
56  std::unique_lock<std::mutex> lock(mutex);
57  if (!listeners[event->getType()].empty()) {
58  scheduledEvents.push_back(event);
59  // must release lock, otherwise there's a deadlock:
60  // thread 1: Reactor::deleteMotherBoard()
61  // EventDistributor::unregisterEventListener()
62  // thread 2: EventDistributor::distributeEvent()
63  // Reactor::enterMainLoop()
64  condition.notify_all();
65  lock.unlock();
66  reactor.enterMainLoop();
67  }
68 }
69 
70 bool EventDistributor::isRegistered(EventType type, EventListener* listener) const
71 {
72  for (auto* l : values(listeners[type])) {
73  if (l == listener) return true;
74  }
75  return false;
76 }
77 
79 {
80  assert(Thread::isMainThread());
81 
82  reactor.getInputEventGenerator().poll();
83  reactor.getInterpreter().poll();
84  reactor.getRTScheduler().execute();
85 
86  std::unique_lock<std::mutex> lock(mutex);
87  // It's possible that executing an event triggers scheduling of another
88  // event. We also want to execute those secondary events. That's why
89  // we have this while loop here.
90  // For example the 'loadstate' command event, triggers a machine switch
91  // event and as reaction to the latter event, AfterCommand will
92  // unsubscribe from the ols MSXEventDistributor. This really should be
93  // done before we exit this method.
94  while (!scheduledEvents.empty()) {
95  EventQueue eventsCopy;
96  swap(eventsCopy, scheduledEvents);
97 
98  for (auto& event : eventsCopy) {
99  auto type = event->getType();
100  auto priorityMapCopy = listeners[type];
101  lock.unlock();
102  auto blockPriority = unsigned(-1); // allow all
103  for (auto& p : priorityMapCopy) {
104  // It's possible delivery to one of the previous
105  // Listeners unregistered the current Listener.
106  if (!isRegistered(type, p.second)) continue;
107 
108  unsigned currentPriority = p.first;
109  if (currentPriority >= blockPriority) break;
110 
111  if (unsigned block = p.second->signalEvent(event)) {
112  assert(block > currentPriority);
113  blockPriority = block;
114  }
115  }
116  lock.lock();
117  }
118  }
119 }
120 
121 bool EventDistributor::sleep(unsigned us)
122 {
123  std::chrono::microseconds duration(us);
124  std::unique_lock<std::mutex> lock(cvMutex);
125  return condition.wait_for(lock, duration) == std::cv_status::timeout;
126 }
127 
128 } // namespace openmsx
detail::KeyRange< MAP, 1 > values(const MAP &map)
Definition: KeyRange.hh:54
Contains the main loop of openMSX.
Definition: Reactor.hh:64
string_view::const_iterator begin(const string_view &x)
Definition: string_view.hh:152
void registerEventListener(EventType type, EventListener &listener, Priority priority=OTHER)
Registers a given object to receive certain events.
void unregisterEventListener(EventType type, EventListener &listener)
Unregisters a previously registered event listener.
Priority
Priorities from high to low, higher priority listeners can block events for lower priority listeners...
void distributeEvent(const EventPtr &event)
Schedule the given event for delivery.
bool sleep(unsigned us)
Sleep for the specified amount of time, but return early when (another thread) called the distributeE...
EventDistributor(Reactor &reactor)
InputEventGenerator & getInputEventGenerator()
Definition: Reactor.hh:82
std::shared_ptr< const Event > EventPtr
void deliverEvents()
This actually delivers the events.
RTScheduler & getRTScheduler()
Definition: Reactor.hh:78
string_view::const_iterator end(const string_view &x)
Definition: string_view.hh:153
Thanks to enen for testing this on a real cartridge:
Definition: Autofire.cc:5
EventType
Definition: Event.hh:10
void enterMainLoop()
Definition: Reactor.cc:488
void execute()
Execute all expired RTSchedulables.
Definition: RTScheduler.hh:23
auto rfind_if_unguarded(RANGE &range, PRED pred) -> decltype(std::begin(range))
Definition: stl.hh:174
bool isMainThread()
Returns true when called from the main thread.
Definition: Thread.cc:16
Interpreter & getInterpreter()
Definition: Reactor.cc:285