openMSX
EventDistributor.cc
Go to the documentation of this file.
1 #include "EventDistributor.hh"
2 #include "EventListener.hh"
3 #include "Reactor.hh"
4 #include "RTScheduler.hh"
5 #include "Interpreter.hh"
6 #include "InputEventGenerator.hh"
7 #include "Thread.hh"
8 #include "ranges.hh"
9 #include "stl.hh"
10 #include "view.hh"
11 #include <cassert>
12 #include <chrono>
13 
14 using std::string;
15 
16 namespace openmsx {
17 
19  : reactor(reactor_)
20 {
21 }
22 
24  EventType type, EventListener& listener, Priority priority)
25 {
26  std::lock_guard<std::mutex> lock(mutex);
27  auto& priorityMap = listeners[type];
28  // a listener may only be registered once for each type
29  assert(!contains(view::values(priorityMap), &listener));
30  // insert at highest position that keeps listeners sorted on priority
31  auto it = ranges::upper_bound(priorityMap, priority, LessTupleElement<0>());
32  priorityMap.insert(it, {priority, &listener});
33 }
34 
36  EventType type, EventListener& listener)
37 {
38  std::lock_guard<std::mutex> lock(mutex);
39  auto& priorityMap = listeners[type];
40  priorityMap.erase(rfind_if_unguarded(priorityMap,
41  [&](PriorityMap::value_type v) { return v.second == &listener; }));
42 }
43 
45 {
46  // TODO: Implement a real solution against modifying data structure while
47  // iterating through it.
48  // For example, assign nullptr first and then iterate again after
49  // delivering events to remove the nullptr values.
50  // TODO: Is it useful to test for 0 listeners or should we just always
51  // queue the event?
52  assert(event);
53  std::unique_lock<std::mutex> lock(mutex);
54  if (!listeners[event->getType()].empty()) {
55  scheduledEvents.push_back(event);
56  // must release lock, otherwise there's a deadlock:
57  // thread 1: Reactor::deleteMotherBoard()
58  // EventDistributor::unregisterEventListener()
59  // thread 2: EventDistributor::distributeEvent()
60  // Reactor::enterMainLoop()
61  condition.notify_all();
62  lock.unlock();
63  reactor.enterMainLoop();
64  }
65 }
66 
67 bool EventDistributor::isRegistered(EventType type, EventListener* listener) const
68 {
69  return contains(view::values(listeners[type]), listener);
70 }
71 
73 {
74  assert(Thread::isMainThread());
75 
76  reactor.getInputEventGenerator().poll();
77  reactor.getInterpreter().poll();
78  reactor.getRTScheduler().execute();
79 
80  std::unique_lock<std::mutex> lock(mutex);
81  // It's possible that executing an event triggers scheduling of another
82  // event. We also want to execute those secondary events. That's why
83  // we have this while loop here.
84  // For example the 'loadstate' command event, triggers a machine switch
85  // event and as reaction to the latter event, AfterCommand will
86  // unsubscribe from the ols MSXEventDistributor. This really should be
87  // done before we exit this method.
88  while (!scheduledEvents.empty()) {
89  EventQueue eventsCopy;
90  swap(eventsCopy, scheduledEvents);
91 
92  for (auto& event : eventsCopy) {
93  auto type = event->getType();
94  auto priorityMapCopy = listeners[type];
95  lock.unlock();
96  auto blockPriority = unsigned(-1); // allow all
97  for (auto& p : priorityMapCopy) {
98  // It's possible delivery to one of the previous
99  // Listeners unregistered the current Listener.
100  if (!isRegistered(type, p.second)) continue;
101 
102  unsigned currentPriority = p.first;
103  if (currentPriority >= blockPriority) break;
104 
105  if (unsigned block = p.second->signalEvent(event)) {
106  assert(block > currentPriority);
107  blockPriority = block;
108  }
109  }
110  lock.lock();
111  }
112  }
113 }
114 
115 bool EventDistributor::sleep(unsigned us)
116 {
117  std::chrono::microseconds duration(us);
118  std::unique_lock<std::mutex> lock(cvMutex);
119  return condition.wait_for(lock, duration) == std::cv_status::timeout;
120 }
121 
122 } // namespace openmsx
auto values(Map &&map)
Definition: view.hh:323
Contains the main loop of openMSX.
Definition: Reactor.hh:64
void swap(optional< T > &x, optional< T > &y) noexcept(noexcept(x.swap(y)))
Definition: optional.hh:816
void registerEventListener(EventType type, EventListener &listener, Priority priority=OTHER)
Registers a given object to receive certain events.
void unregisterEventListener(EventType type, EventListener &listener)
Unregisters a previously registered event listener.
Priority
Priorities from high to low, higher priority listeners can block events for lower priority listeners...
bool contains(ITER first, ITER last, const VAL &val)
Check if a range contains a given value, using linear search.
Definition: stl.hh:106
void distributeEvent(const EventPtr &event)
Schedule the given event for delivery.
bool sleep(unsigned us)
Sleep for the specified amount of time, but return early when (another thread) called the distributeE...
EventDistributor(Reactor &reactor)
InputEventGenerator & getInputEventGenerator()
Definition: Reactor.hh:82
std::shared_ptr< const Event > EventPtr
void deliverEvents()
This actually delivers the events.
auto upper_bound(ForwardRange &&range, const T &value)
Definition: ranges.hh:83
RTScheduler & getRTScheduler()
Definition: Reactor.hh:78
Thanks to enen for testing this on a real cartridge:
Definition: Autofire.cc:5
EventType
Definition: Event.hh:10
void enterMainLoop()
Definition: Reactor.cc:477
void execute()
Execute all expired RTSchedulables.
Definition: RTScheduler.hh:23
auto rfind_if_unguarded(RANGE &range, PRED pred)
Definition: stl.hh:174
bool isMainThread()
Returns true when called from the main thread.
Definition: Thread.cc:16
Interpreter & getInterpreter()
Definition: Reactor.cc:285