openMSX
EventDistributor.cc
Go to the documentation of this file.
1#include "EventDistributor.hh"
2#include "EventListener.hh"
3#include "Reactor.hh"
4#include "RTScheduler.hh"
5#include "Interpreter.hh"
7#include "Thread.hh"
8#include "ranges.hh"
9#include "stl.hh"
10#include <cassert>
11#include <chrono>
12
13namespace openmsx {
14
16 : reactor(reactor_)
17{
18}
19
21 EventType type, EventListener& listener, Priority priority)
22{
23 std::lock_guard<std::mutex> lock(mutex);
24 auto& priorityMap = listeners[size_t(type)];
25 // a listener may only be registered once for each type
26 assert(!contains(priorityMap, &listener, &Entry::listener));
27 // insert at highest position that keeps listeners sorted on priority
28 auto it = ranges::upper_bound(priorityMap, priority, {}, &Entry::priority);
29 priorityMap.emplace(it, Entry{priority, &listener});
30}
31
33 EventType type, EventListener& listener)
34{
35 std::lock_guard<std::mutex> lock(mutex);
36 auto& priorityMap = listeners[size_t(type)];
37 priorityMap.erase(rfind_unguarded(priorityMap, &listener, &Entry::listener));
38}
39
41{
42 // TODO: Implement a real solution against modifying data structure while
43 // iterating through it.
44 // For example, assign nullptr first and then iterate again after
45 // delivering events to remove the nullptr values.
46 // TODO: Is it useful to test for 0 listeners or should we just always
47 // queue the event?
48 assert(event);
49 std::unique_lock<std::mutex> lock(mutex);
50 if (!listeners[size_t(getType(event))].empty()) {
51 scheduledEvents.push_back(std::move(event));
52 // must release lock, otherwise there's a deadlock:
53 // thread 1: Reactor::deleteMotherBoard()
54 // EventDistributor::unregisterEventListener()
55 // thread 2: EventDistributor::distributeEvent()
56 // Reactor::enterMainLoop()
57 condition.notify_all();
58 lock.unlock();
59 reactor.enterMainLoop();
60 }
61}
62
63bool EventDistributor::isRegistered(EventType type, EventListener* listener) const
64{
65 return contains(listeners[size_t(type)], listener, &Entry::listener);
66}
67
69{
70 static PriorityMap priorityMapCopy; // static to preserve capacity
71 static EventQueue eventsCopy; // static to preserve capacity
72
73 assert(Thread::isMainThread());
74
75 reactor.getInputEventGenerator().poll();
76 reactor.getInterpreter().poll();
77 reactor.getRTScheduler().execute();
78
79 std::unique_lock<std::mutex> lock(mutex);
80 // It's possible that executing an event triggers scheduling of another
81 // event. We also want to execute those secondary events. That's why
82 // we have this while loop here.
83 // For example the 'loadstate' command event, triggers a machine switch
84 // event and as reaction to the latter event, AfterCommand will
85 // unsubscribe from the ols MSXEventDistributor. This really should be
86 // done before we exit this method.
87 while (!scheduledEvents.empty()) {
88 assert(eventsCopy.empty());
89 swap(eventsCopy, scheduledEvents);
90 for (auto& event : eventsCopy) {
91 auto type = getType(event);
92 priorityMapCopy = listeners[size_t(type)];
93 lock.unlock();
94 int blockPriority = Priority::LOWEST; // allow all
95 for (const auto& e : priorityMapCopy) {
96 // It's possible delivery to one of the previous
97 // Listeners unregistered the current Listener.
98 if (!isRegistered(type, e.listener)) continue;
99
100 if (e.priority >= blockPriority) break;
101
102 if (int block = e.listener->signalEvent(event)) {
103 assert(block > e.priority);
104 blockPriority = block;
105 }
106 }
107 lock.lock();
108 }
109 eventsCopy.clear();
110 }
111}
112
113bool EventDistributor::sleep(unsigned us)
114{
115 std::chrono::microseconds duration(us);
116 std::unique_lock<std::mutex> lock(cvMutex);
117 return condition.wait_for(lock, duration) == std::cv_status::timeout;
118}
119
120} // namespace openmsx
void unregisterEventListener(EventType type, EventListener &listener)
Unregisters a previously registered event listener.
void distributeEvent(Event &&event)
Schedule the given event for delivery.
EventDistributor(Reactor &reactor)
bool sleep(unsigned us)
Sleep for the specified amount of time, but return early when (another thread) called the distributeE...
void registerEventListener(EventType type, EventListener &listener, Priority priority=OTHER)
Registers a given object to receive certain events.
void deliverEvents()
This actually delivers the events.
Priority
Priorities from high to low, higher priority listeners can block events for lower priority listeners.
void execute()
Execute all expired RTSchedulables.
Definition: RTScheduler.hh:22
Contains the main loop of openMSX.
Definition: Reactor.hh:68
void enterMainLoop()
Definition: Reactor.cc:487
RTScheduler & getRTScheduler()
Definition: Reactor.hh:81
Interpreter & getInterpreter()
Definition: Reactor.cc:317
InputEventGenerator & getInputEventGenerator()
Definition: Reactor.hh:85
constexpr double e
Definition: Math.hh:18
bool isMainThread()
Returns true when called from the main thread.
Definition: Thread.cc:15
This file implemented 3 utility functions:
Definition: Autofire.cc:9
EventType
Definition: Event.hh:506
EventType getType(const Event &event)
Definition: Event.hh:644
auto upper_bound(ForwardRange &&range, const T &value, Compare comp={}, Proj proj={})
Definition: ranges.hh:108
void swap(openmsx::MemBuffer< T > &l, openmsx::MemBuffer< T > &r) noexcept
Definition: MemBuffer.hh:202
auto rfind_unguarded(RANGE &range, const VAL &val, Proj proj={})
Similar to the find(_if)_unguarded functions above, but searches from the back to front.
Definition: stl.hh:100
constexpr bool contains(ITER first, ITER last, const VAL &val)
Check if a range contains a given value, using linear search.
Definition: stl.hh:23