openMSX
EventDistributor.cc
Go to the documentation of this file.
1 #include "EventDistributor.hh"
2 #include "EventListener.hh"
3 #include "Reactor.hh"
4 #include "RTScheduler.hh"
5 #include "Interpreter.hh"
6 #include "InputEventGenerator.hh"
7 #include "Thread.hh"
8 #include "ranges.hh"
9 #include "stl.hh"
10 #include <cassert>
11 #include <chrono>
12 
13 namespace openmsx {
14 
16  : reactor(reactor_)
17 {
18 }
19 
21  EventType type, EventListener& listener, Priority priority)
22 {
23  std::lock_guard<std::mutex> lock(mutex);
24  auto& priorityMap = listeners[size_t(type)];
25  // a listener may only be registered once for each type
26  assert(!contains(priorityMap, &listener, &Entry::listener));
27  // insert at highest position that keeps listeners sorted on priority
28  auto it = ranges::upper_bound(priorityMap, priority, {}, &Entry::priority);
29  priorityMap.emplace(it, Entry{priority, &listener});
30 }
31 
33  EventType type, EventListener& listener)
34 {
35  std::lock_guard<std::mutex> lock(mutex);
36  auto& priorityMap = listeners[size_t(type)];
37  priorityMap.erase(rfind_unguarded(priorityMap, &listener, &Entry::listener));
38 }
39 
41 {
42  // TODO: Implement a real solution against modifying data structure while
43  // iterating through it.
44  // For example, assign nullptr first and then iterate again after
45  // delivering events to remove the nullptr values.
46  // TODO: Is it useful to test for 0 listeners or should we just always
47  // queue the event?
48  assert(event);
49  std::unique_lock<std::mutex> lock(mutex);
50  if (!listeners[size_t(getType(event))].empty()) {
51  scheduledEvents.push_back(std::move(event));
52  // must release lock, otherwise there's a deadlock:
53  // thread 1: Reactor::deleteMotherBoard()
54  // EventDistributor::unregisterEventListener()
55  // thread 2: EventDistributor::distributeEvent()
56  // Reactor::enterMainLoop()
57  condition.notify_all();
58  lock.unlock();
59  reactor.enterMainLoop();
60  }
61 }
62 
63 bool EventDistributor::isRegistered(EventType type, EventListener* listener) const
64 {
65  return contains(listeners[size_t(type)], listener, &Entry::listener);
66 }
67 
69 {
70  static PriorityMap priorityMapCopy; // static to preserve capacity
71  static EventQueue eventsCopy; // static to preserve capacity
72 
73  assert(Thread::isMainThread());
74 
75  reactor.getInputEventGenerator().poll();
76  reactor.getInterpreter().poll();
77  reactor.getRTScheduler().execute();
78 
79  std::unique_lock<std::mutex> lock(mutex);
80  // It's possible that executing an event triggers scheduling of another
81  // event. We also want to execute those secondary events. That's why
82  // we have this while loop here.
83  // For example the 'loadstate' command event, triggers a machine switch
84  // event and as reaction to the latter event, AfterCommand will
85  // unsubscribe from the ols MSXEventDistributor. This really should be
86  // done before we exit this method.
87  while (!scheduledEvents.empty()) {
88  assert(eventsCopy.empty());
89  swap(eventsCopy, scheduledEvents);
90  for (auto& event : eventsCopy) {
91  auto type = getType(event);
92  priorityMapCopy = listeners[size_t(type)];
93  lock.unlock();
94  int blockPriority = Priority::LOWEST; // allow all
95  for (const auto& e : priorityMapCopy) {
96  // It's possible delivery to one of the previous
97  // Listeners unregistered the current Listener.
98  if (!isRegistered(type, e.listener)) continue;
99 
100  if (e.priority >= blockPriority) break;
101 
102  if (int block = e.listener->signalEvent(event)) {
103  assert(block > e.priority);
104  blockPriority = block;
105  }
106  }
107  lock.lock();
108  }
109  eventsCopy.clear();
110  }
111 }
112 
113 bool EventDistributor::sleep(unsigned us)
114 {
115  std::chrono::microseconds duration(us);
116  std::unique_lock<std::mutex> lock(cvMutex);
117  return condition.wait_for(lock, duration) == std::cv_status::timeout;
118 }
119 
120 } // namespace openmsx
void unregisterEventListener(EventType type, EventListener &listener)
Unregisters a previously registered event listener.
void distributeEvent(Event &&event)
Schedule the given event for delivery.
EventDistributor(Reactor &reactor)
bool sleep(unsigned us)
Sleep for the specified amount of time, but return early when (another thread) called the distributeE...
void registerEventListener(EventType type, EventListener &listener, Priority priority=OTHER)
Registers a given object to receive certain events.
void deliverEvents()
This actually delivers the events.
Priority
Priorities from high to low, higher priority listeners can block events for lower priority listeners.
void execute()
Execute all expired RTSchedulables.
Definition: RTScheduler.hh:23
Contains the main loop of openMSX.
Definition: Reactor.hh:68
void enterMainLoop()
Definition: Reactor.cc:487
RTScheduler & getRTScheduler()
Definition: Reactor.hh:81
InputEventGenerator & getInputEventGenerator()
Definition: Reactor.hh:85
Interpreter & getInterpreter()
Definition: Reactor.cc:317
bool isMainThread()
Returns true when called from the main thread.
Definition: Thread.cc:15
This file implemented 3 utility functions:
Definition: Autofire.cc:9
EventType
Definition: Event.hh:507
EventType getType(const Event &event)
Definition: Event.hh:645
auto upper_bound(ForwardRange &&range, const T &value, Compare comp={}, Proj proj={})
Definition: ranges.hh:94
auto rfind_unguarded(RANGE &range, const VAL &val, Proj proj={})
Similar to the find(_if)_unguarded functions above, but searches from the back to front.
Definition: stl.hh:109
constexpr bool contains(ITER first, ITER last, const VAL &val)
Check if a range contains a given value, using linear search.
Definition: stl.hh:32