Initial draft of pthread-based impl

This commit is contained in:
Michal Moskal 2017-07-03 13:57:34 +02:00
parent a1fc4290f4
commit f0a7990efa
3 changed files with 125 additions and 58 deletions

View File

@ -19,7 +19,7 @@ DEPS = $(PXT_HEADERS) package.json Makefile Makefile.inc
all: $(EXE) all: $(EXE)
$(EXE): $(PXT_OBJS) $(EXE): $(PXT_OBJS)
$(LD) -o $(EXE) $(LDFLAGS) -Wl,-Map,$(EXE:.elf=.map) $(PXT_OBJS) $(LIBSTDCPP) -lm $(NPM_LIBS) $(LD) -o $(EXE) $(LDFLAGS) -Wl,-Map,$(EXE:.elf=.map) $(PXT_OBJS) $(LIBSTDCPP) -lm -lpthread $(NPM_LIBS)
cp $(EXE) $(EXE:.elf=.full) cp $(EXE) $(EXE:.elf=.full)
$(PREF)strip $(EXE) $(PREF)strip $(EXE)
node -p 'require("fs").readFileSync("$(EXE)").toString("hex")' > $(HEX) node -p 'require("fs").readFileSync("$(EXE)").toString("hex")' > $(HEX)

View File

@ -5,40 +5,11 @@
#include <sys/time.h> #include <sys/time.h>
#include <time.h> #include <time.h>
#include <cstdarg> #include <cstdarg>
#include <pthread.h>
#include <assert.h>
#include <map>
#include <pth.h> #define DEVICE_EVT_ANY 0
// TODO set JOINABLE to false
static int startTime;
static map<pair<int, int>, Action> handlersMap;
static pth_msgport_t evMsgPort;
struct Thread {
struct Thread *next;
Action act;
TValue arg0;
pth_t pid;
pth_msgport_t waitEventPort;
int waitSource;
int waitValue;
};
static struct Thread *allThreads;
struct Event {
pth_message_t msg;
int source;
int value;
};
Event *mkEvent(int source, int value) {
auto res = new Event();
memset(res, 0, sizeof(Event));
res->source = source;
res->value = value;
return res;
}
void dmesg(const char *format, ...) { void dmesg(const char *format, ...) {
char buf[500]; char buf[500];
@ -53,6 +24,43 @@ void dmesg(const char *format, ...) {
namespace pxt { namespace pxt {
static int startTime;
static std::map<std::pair<int, int>, Action> handlersMap;
static pthread_mutex_t execMutex;
static pthread_t execThread;
static pthread_cond_t newEventBroadcast;
static pthread_mutex_t newEventMutex;
struct Thread {
struct Thread *next;
Action act;
TValue arg0;
pthread_t pid;
pthread_cond_t waitCond;
int waitSource;
int waitValue;
};
static struct Thread *allThreads;
static struct Event *eventHead, *eventTail;
struct Event {
struct Event *next;
int source;
int value;
};
Event lastEvent;
Event *mkEvent(int source, int value) {
auto res = new Event();
memset(res, 0, sizeof(Event));
res->source = source;
res->value = value;
return res;
}
void sendSerial(const char *data, int len) { void sendSerial(const char *data, int len) {
fwrite(data, 1, len, stderr); fwrite(data, 1, len, stderr);
} }
@ -68,15 +76,38 @@ extern "C" void target_reset() {
exit(0); exit(0);
} }
void checkUserMode() {
assert(execThread == pthread_self());
}
void startUser() {
assert(execThread != pthread_self());
pthread_mutex_lock(&execMutex);
execThread = pthread_self();
}
void stopUser() {
assert(execThread == pthread_self());
execThread = 0;
pthread_mutex_unlock(&execMutex);
}
void sleep_ms(uint32_t ms) { void sleep_ms(uint32_t ms) {
struct timeval tv; stopUser();
tv.tv_sec = ms / 1000;
tv.tv_usec = (ms % 1000) * 1000; struct timespec ts;
pth_nap(tv); ts.tv_sec = ms / 1000;
ts.tv_nsec = (ms % 1000) * 1000000;
while (nanosleep(&ts, &ts))
;
startUser();
} }
void sleep_us(uint64_t us) { void sleep_us(uint64_t us) {
if (us > 20000) { checkUserMode();
if (us > 10000) {
sleep_ms(us / 1000); sleep_ms(us / 1000);
} }
struct timespec ts; struct timespec ts;
@ -112,25 +143,30 @@ void disposeThread(Thread *t) {
} }
} }
decr(t->act); decr(t->act);
pth_msgport_destroy(t->waitEventPort); pthread_cond_destroy(&t->waitCond);
delete t; delete t;
} }
static void runAct(Thread *thr) { static void runAct(Thread *thr) {
startUser();
pxt::runAction1(thr->act, thr->arg0); pxt::runAction1(thr->act, thr->arg0);
stopUser();
disposeThread(thr); disposeThread(thr);
} }
void setupThread(Action a, TValue arg = 0, void (*runner)(Thread *) = runAct) { void setupThread(Action a, TValue arg = 0, void (*runner)(Thread *) = NULL) {
if (runner == NULL)
runner = runAct;
auto thr = new Thread(); auto thr = new Thread();
memset(thr, 0, sizeof(Thread)); memset(thr, 0, sizeof(Thread));
thr->next = allThreads; thr->next = allThreads;
allThreads = thr; allThreads = thr;
thr->act = a; thr->act = a;
thr->arg0 = a; thr->arg0 = a;
thr->waitEventPort = pth_msgport_create(NULL); pthread_cond_init(&thr->waitCond, NULL);
incr(a); incr(a);
pth_spawn(PTH_ATTR_DEFAULT, runner, thr); pthread_create(&thr->pid, NULL, (void *(*)(void *))runner, thr);
pthread_detach(thr->pid);
} }
void runInBackground(Action a) { void runInBackground(Action a) {
@ -138,6 +174,7 @@ void runInBackground(Action a) {
} }
static void runFor(Thread *t) { static void runFor(Thread *t) {
startUser();
while (true) { while (true) {
pxt::runAction0(t->act); pxt::runAction0(t->act);
sleep_ms(20); sleep_ms(20);
@ -148,36 +185,55 @@ void runForever(Action a) {
setupThread(a, 0, runFor); setupThread(a, 0, runFor);
} }
void waitForEvent(int id, int event) { void waitForEvent(int source, int value) {
// TODO auto self = pthread_self();
for (auto t = allThreads; t; t = t->next) {
if (t->pid == self) {
t->waitSource = source;
t->waitValue = value;
stopUser();
// spourious wake ups may occur they say
while (t->waitSource) {
pthread_mutex_lock(&newEventMutex);
pthread_cond_wait(&t->waitCond, &newEventMutex);
}
startUser();
return;
}
}
assert(0);
} }
static void dispatchEvent(Event &e) { static void dispatchEvent(Event &e) {
// lastEvent = e; lastEvent = e;
Action curr = handlersMap[{e.source, e.value}]; Action curr = handlersMap[{e.source, e.value}];
if (curr) if (curr)
runAction1(curr, fromInt(e.value)); setupThread(curr, fromInt(e.value));
curr = handlersMap[{e.source, DEVICE_EVT_ANY}]; curr = handlersMap[{e.source, DEVICE_EVT_ANY}];
if (curr) if (curr)
runAction1(curr, fromInt(e.value)); setupThread(curr, fromInt(e.value));
} }
static void *evtDispatcher(void *dummy) { static void *evtDispatcher(void *dummy) {
pth_event_t msgEv = pth_event(PTH_EVENT_MSG, evMsgPort); pthread_mutex_lock(&newEventMutex);
Event *ev;
while (true) { while (true) {
pth_wait(msgEv); pthread_cond_wait(&newEventBroadcast, &newEventMutex);
while (NULL != (ev = (Event *)pth_msgport_get(evMsgPort))) { while (eventHead != NULL) {
Event *ev = eventHead;
eventHead = ev->next;
if (eventHead == NULL)
eventTail = NULL;
for (auto thr = allThreads; thr; thr = thr->next) { for (auto thr = allThreads; thr; thr = thr->next) {
if (thr->waitSource == ev->source && if (thr->waitSource == ev->source &&
(thr->waitValue == ev->value || thr->waitValue == DEVICE_EVT_ANY)) { (thr->waitValue == ev->value || thr->waitValue == DEVICE_EVT_ANY)) {
Event *copy = mkEvent(ev->source, ev->value);
pth_msgport_put(thr->waitEventPort, copy);
thr->waitSource = 0; // once! thr->waitSource = 0; // once!
pthread_cond_broadcast(&thr->waitCond);
} }
} }
dispatchEvent(*ev); dispatchEvent(*ev);
delete ev; delete ev;
} }
@ -185,8 +241,18 @@ static void *evtDispatcher(void *dummy) {
} }
void raiseEvent(int id, int event) { void raiseEvent(int id, int event) {
checkUserMode();
auto e = mkEvent(id, event); auto e = mkEvent(id, event);
pth_msgport_put(evMsgPort, e); pthread_mutex_lock(&newEventMutex);
if (eventTail == NULL) {
assert(eventHead == NULL);
eventHead = eventTail = e;
} else {
eventTail->next = e;
eventTail = e;
}
pthread_cond_broadcast(&newEventBroadcast);
pthread_mutex_unlock(&newEventMutex);
} }
void registerWithDal(int id, int event, Action a) { void registerWithDal(int id, int event, Action a) {
@ -209,6 +275,8 @@ void dumpDmesg() {
void initRuntime() { void initRuntime() {
startTime = currTime(); startTime = currTime();
pth_init(); pthread_t disp;
pthread_create(&disp, NULL, evtDispatcher, NULL);
pthread_detach(disp);
} }
} }

View File

@ -13,8 +13,7 @@
"test.ts" "test.ts"
], ],
"npmDependencies": { "npmDependencies": {
"pxt-linux-ev3-api": "1.0.0", "pxt-linux-ev3-api": "1.0.0"
"pxt-linux-pth": "2.0.7"
}, },
"public": true, "public": true,
"dependencies": { "dependencies": {