fixing some concurrency issues; redo of locking concept in linked

This commit is contained in:
overflowerror 2021-04-29 00:03:40 +02:00
parent 5030fb80d9
commit ced0404557
5 changed files with 142 additions and 48 deletions

View file

@ -1,14 +1,14 @@
#include <semaphore.h>
#include <stdbool.h> #include <stdbool.h>
#include <stdlib.h> #include <stdlib.h>
#include <pthread.h>
#include "linked.h" #include "linked.h"
linkedList_t linked_create() { linkedList_t linked_create() {
linkedList_t list = { linkedList_t list = {
.lock = PTHREAD_MUTEX_INITIALIZER,
.first = NULL .first = NULL
}; };
sem_init(&(list.modify_sem), 0, 1);
return list; return list;
} }
@ -18,7 +18,7 @@ size_t linked_push(linkedList_t* list, const void* data) {
return -1; return -1;
new->data = (void*) data; new->data = (void*) data;
sem_init(&(new->modify_sem), 0, 1); pthread_mutex_init(&new->lock, NULL);
new->inUse = 0; new->inUse = 0;
new->list = list; new->list = list;
new->next = NULL; new->next = NULL;
@ -28,68 +28,83 @@ size_t linked_push(linkedList_t* list, const void* data) {
link_t* current = NULL; link_t* current = NULL;
sem_wait(&(list->modify_sem)); pthread_mutex_lock(&(list->lock));
current = list->first; current = list->first;
if (current == NULL) { if (current == NULL) {
new->prev = NULL; new->prev = NULL;
list->first = new; list->first = new;
sem_post(&(list->modify_sem)); pthread_mutex_unlock(&(list->lock));
return index; return index;
} }
sem_post(&(list->modify_sem)); pthread_mutex_lock(&(current->lock));
pthread_mutex_unlock(&(list->lock));
index++; index++;
sem_wait(&(current->modify_sem));
while(current->next != NULL) { while(current->next != NULL) {
sem_wait(&(current->next->modify_sem)); link_t* tmp = current->next;
sem_post(&(current->modify_sem)); pthread_mutex_lock(&(current->next->lock));
current = current->next; pthread_mutex_unlock(&(current->lock));
current = tmp;
index++; index++;
} }
new->prev = current; new->prev = current;
current->next = new; current->next = new;
sem_post(&(current->modify_sem)); pthread_mutex_unlock(&(current->lock));
return index; return index;
} }
link_t* linked_first(linkedList_t* list) { link_t* linked_first(linkedList_t* list) {
pthread_mutex_lock(&(list->lock));
if (list->first == NULL) { if (list->first == NULL) {
pthread_mutex_unlock(&(list->lock));
return NULL; return NULL;
} }
link_t* link = list->first; link_t* link = list->first;
pthread_mutex_lock(&(link->lock));
pthread_mutex_unlock(&(list->lock));
link->inUse++; link->inUse++;
pthread_mutex_unlock(&(link->lock));
return link; return link;
} }
// link has to be locked
void linked_free(link_t* link) { void linked_free(link_t* link) {
pthread_mutex_unlock(&(link->lock));
pthread_mutex_destroy(&(link->lock));
free(link); free(link);
} }
void linked_release(link_t* link) { // link has to be locked
bool linked_release(link_t* link) {
link->inUse--; link->inUse--;
if (link->unlinked && (link->inUse == 0)) { if (link->unlinked && (link->inUse == 0)) {
linked_free(link); linked_free(link);
return true;
} }
return false;
} }
link_t* linked_next(link_t* link) { link_t* linked_next(link_t* link) {
pthread_mutex_lock(&(link->lock));
link_t* next = link->next; link_t* next = link->next;
linked_release(link);
if (next == NULL) { if (next != NULL) {
return NULL; pthread_mutex_lock(&(next->lock));
next->inUse++;
pthread_mutex_unlock(&(next->lock));
}
if (!linked_release(link)) {
pthread_mutex_unlock(&(link->lock));
} }
link = next;
link->inUse++; return next;
return link;
} }
link_t* linked_get(linkedList_t* list, size_t index) { link_t* linked_get(linkedList_t* list, size_t index) {
@ -121,7 +136,7 @@ size_t linked_length(linkedList_t* list) {
int linked_unlink(link_t* link) { int linked_unlink(link_t* link) {
// need to get prev sem before link sem to avoid dead lock // need to get prev sem before link sem to avoid dead lock
sem_t* prevModify = NULL; pthread_mutex_t* prevLock;
link_t** prevNext = NULL; link_t** prevNext = NULL;
while(true) { while(true) {
@ -129,78 +144,100 @@ int linked_unlink(link_t* link) {
* Try to lock prev. If locked prev was modified try again. * Try to lock prev. If locked prev was modified try again.
* Abort if link is unlinked. * Abort if link is unlinked.
*/ */
pthread_mutex_lock(&(link->lock));
link_t* prev = link->prev; link_t* prev = link->prev;
pthread_mutex_unlock(&(link->lock));
if (prev == NULL) { if (prev == NULL) {
sem_wait(&(link->list->modify_sem)); pthread_mutex_lock(&(link->list->lock));
if (link->list->first != link) { if (link->list->first != link) {
sem_post(&(link->list->modify_sem)); pthread_mutex_unlock(&(link->list->lock));
if (link->unlinked) { if (link->unlinked) {
return -1; return -1;
} }
continue; continue;
} }
prevModify = &(link->list->modify_sem);
prevLock = &(link->list->lock);
prevNext = &(link->list->first); prevNext = &(link->list->first);
break; break;
} else { } else {
sem_wait(&(prev->modify_sem)); pthread_mutex_lock(&(prev->lock));
if (prev->next != link) { if (prev->next != link) {
sem_post(&(prev->modify_sem)); pthread_mutex_unlock(&(prev->lock));
if (link->unlinked) { if (link->unlinked) {
return - 1; return - 1;
} }
continue; continue;
} }
prevModify = &(prev->modify_sem); prevLock = &(prev->lock);
prevNext = &(prev->next); prevNext = &(prev->next);
break; break;
} }
} }
sem_wait(&(link->modify_sem)); // prev is locked, no danger of a deadlock -> lock current link
pthread_mutex_lock(&(link->lock));
if (link->unlinked) { if (link->unlinked) {
sem_post(&(link->modify_sem)); // already unlinked; release all locks and return
sem_post(prevModify);
pthread_mutex_unlock(&(link->lock));
pthread_mutex_unlock(prevLock);
return - 1; return - 1;
} }
if (link->next) { if (link->next) {
sem_wait(&(link->next->modify_sem)); pthread_mutex_lock(&(link->next->lock));
// we don't need to check for changes because link is already locked // we don't need to check for changes because link is already locked
// while we are here we can modify next // while we are here we can modify next
link->next->prev = link->prev; link->next->prev = link->prev;
// we don't need this one anymore // we don't need link->next anymore
sem_post(&(link->next->modify_sem)); pthread_mutex_unlock(&(link->next->lock));
} }
link->unlinked = true; link->unlinked = true;
*prevNext = link->next; *prevNext = link->next;
// all changes to prev are done -> unlock
pthread_mutex_unlock(prevLock);
// everything is done // everything is done
// check if the link is used; if so: unlock; if not: free
sem_post(&(link->modify_sem));
sem_post(prevModify);
if (link->inUse == 0) { if (link->inUse == 0) {
linked_free(link); linked_free(link);
} else {
pthread_mutex_unlock(&(link->lock));
} }
return 0; return 0;
} }
void linked_destroy(linkedList_t* list) { void linked_destroy(linkedList_t* list) {
// no new links can be added after locking the list
pthread_mutex_lock(&(list->lock));
link_t* link = list->first; link_t* link = list->first;
while(link != NULL) { while(link != NULL) {
pthread_mutex_lock(&(link->lock));
link_t* next = link->next; link_t* next = link->next;
// no new links can be added, so we can unlock link (next can't be changed)
pthread_mutex_unlock(&(link->lock));
linked_unlink(link); linked_unlink(link);
link = next; link = next;
} }
pthread_mutex_unlock(&(list->lock));
pthread_mutex_destroy(&(list->lock));
} }

View file

@ -1,18 +1,18 @@
#ifndef LINKED_H #ifndef LINKED_H
#define LINKED_H #define LINKED_H
#include <semaphore.h>
#include <stdbool.h> #include <stdbool.h>
#include <signal.h> #include <signal.h>
#include <pthread.h>
typedef struct linkedList { typedef struct linkedList {
sem_t modify_sem; pthread_mutex_t lock;
struct link* first; struct link* first;
} linkedList_t; } linkedList_t;
typedef struct link { typedef struct link {
void* data; void* data;
sem_t modify_sem; pthread_mutex_t lock;
volatile sig_atomic_t inUse; volatile sig_atomic_t inUse;
bool unlinked; bool unlinked;
struct link* next; struct link* next;
@ -25,7 +25,7 @@ size_t linked_push(linkedList_t* list, const void* data);
link_t* linked_first(linkedList_t* list); link_t* linked_first(linkedList_t* list);
link_t* linked_next(link_t* link); link_t* linked_next(link_t* link);
size_t linked_length(linkedList_t* list); size_t linked_length(linkedList_t* list);
void linked_release(link_t* link); bool linked_release(link_t* link);
link_t* linked_get(linkedList_t* list, size_t index); link_t* linked_get(linkedList_t* list, size_t index);
int linked_unlink(link_t* link); int linked_unlink(link_t* link);
void linked_destroy(linkedList_t* list); void linked_destroy(linkedList_t* list);

1
src/linked.s Normal file
View file

@ -0,0 +1 @@
.file "linked.c"

View file

@ -70,11 +70,13 @@ void cleanup() {
long diffms = timespacAgeMs(connection->timing.lastUpdate); long diffms = timespacAgeMs(connection->timing.lastUpdate);
pthread_mutex_lock(&(connection->lock));
if (connection->state != OPENED) { if (connection->state != OPENED) {
unlink = true; unlink = true;
} else if (diffms > networkingConfig.connectionTimeout) { } else if (diffms > networkingConfig.connectionTimeout) {
connection->state = ABORTED; connection->state = ABORTED;
} }
pthread_mutex_unlock(&(connection->lock));
if (unlink) { if (unlink) {
linked_push(&connectionsToFree, connection); linked_push(&connectionsToFree, connection);
@ -94,7 +96,11 @@ void cleanup() {
while(link != NULL) { while(link != NULL) {
length++; length++;
struct connection* connection = link->data; struct connection* connection = link->data;
if (connection->inUse == 0) {
pthread_mutex_lock(&(connection->lock));
if (connection->inUse == 0) {
linked_unlink(link);
freed++; freed++;
#ifdef SSL_SUPPORT #ifdef SSL_SUPPORT
@ -136,9 +142,12 @@ void cleanup() {
if (connection->peer.name != NULL) if (connection->peer.name != NULL)
free(connection->peer.name); free(connection->peer.name);
free(connection); pthread_mutex_unlock(&(connection->lock));
pthread_mutex_destroy(&(connection->lock));
linked_unlink(link); free(connection);
} else {
pthread_mutex_unlock(&(connection->lock));
} }
link = linked_next(link); link = linked_next(link);
@ -244,7 +253,9 @@ void safeEndConnection(struct connection* connection, bool force) {
close(connection->readfd); close(connection->readfd);
close(connection->writefd); close(connection->writefd);
pthread_mutex_lock(&(connection->lock));
connection->inUse--; connection->inUse--;
pthread_mutex_unlock(&(connection->lock));
} }
int sendHeader(int statusCode, struct headers* headers, struct request* request) { int sendHeader(int statusCode, struct headers* headers, struct request* request) {
@ -333,8 +344,12 @@ void* requestThread(void* data) {
if (pipe(&(pipefd[0])) < 0) { if (pipe(&(pipefd[0])) < 0) {
error("networking: Couldn't create reponse pipe: %s", strerror(errno)); error("networking: Couldn't create reponse pipe: %s", strerror(errno));
warn("Aborting request."); warn("Aborting request.");
pthread_mutex_lock(&(connection->lock));
connection->state = ABORTED; connection->state = ABORTED;
connection->inUse--; connection->inUse--;
pthread_mutex_unlock(&(connection->lock));
return NULL; return NULL;
} }
@ -348,8 +363,12 @@ void* requestThread(void* data) {
error("networking: Couldn't create reponse pipe: %s", strerror(errno)); error("networking: Couldn't create reponse pipe: %s", strerror(errno));
warn("Aborting request."); warn("Aborting request.");
pthread_mutex_lock(&(connection->lock));
connection->state = ABORTED; connection->state = ABORTED;
connection->inUse--; connection->inUse--;
pthread_mutex_unlock(&(connection->lock));
return NULL; return NULL;
} }
@ -365,8 +384,12 @@ void* requestThread(void* data) {
error("networking: Couldn't start helper thread."); error("networking: Couldn't start helper thread.");
warn("networking: Aborting request."); warn("networking: Aborting request.");
pthread_mutex_lock(&(connection->lock));
connection->state = ABORTED; connection->state = ABORTED;
connection->inUse--; connection->inUse--;
pthread_mutex_unlock(&(connection->lock));
return NULL; return NULL;
} }
if (startCopyThread(response, connection->writefd, false, &(connection->threads.helper[1])) < 0) { if (startCopyThread(response, connection->writefd, false, &(connection->threads.helper[1])) < 0) {
@ -377,8 +400,12 @@ void* requestThread(void* data) {
error("networking: Couldn't start helper thread."); error("networking: Couldn't start helper thread.");
warn("networking: Aborting request."); warn("networking: Aborting request.");
pthread_mutex_lock(&(connection->lock));
connection->state = ABORTED; connection->state = ABORTED;
connection->inUse--; connection->inUse--;
pthread_mutex_unlock(&(connection->lock));
return NULL; return NULL;
} }
@ -390,8 +417,12 @@ void* requestThread(void* data) {
error("networking: Couldn't start response thread."); error("networking: Couldn't start response thread.");
warn("networking: Aborting request."); warn("networking: Aborting request.");
pthread_mutex_lock(&(connection->lock));
connection->state = ABORTED; connection->state = ABORTED;
connection->inUse--; connection->inUse--;
pthread_mutex_unlock(&(connection->lock));
return NULL; return NULL;
} }
@ -413,14 +444,21 @@ void* requestThread(void* data) {
} }
void startRequestHandler(struct connection* connection) { void startRequestHandler(struct connection* connection) {
pthread_mutex_lock(&(connection->lock));
connection->inUse++; connection->inUse++;
pthread_mutex_unlock(&(connection->lock));
debug("networking: starting request handler"); debug("networking: starting request handler");
if (pthread_create(&(connection->threads.request), NULL, &requestThread, connection) != 0) { if (pthread_create(&(connection->threads.request), NULL, &requestThread, connection) != 0) {
error("networking: Couldn't start request thread."); error("networking: Couldn't start request thread.");
warn("networking: Aborting request."); warn("networking: Aborting request.");
pthread_mutex_lock(&(connection->lock));
connection->state = ABORTED; connection->state = ABORTED;
connection->inUse--; connection->inUse--;
pthread_mutex_unlock(&(connection->lock));
return; return;
} }
} }
@ -434,9 +472,15 @@ void dataHandler(int signo) {
for(link_t* link = linked_first(&connectionList); link != NULL; link = linked_next(link)) { for(link_t* link = linked_first(&connectionList); link != NULL; link = linked_next(link)) {
debug("networking: connection %p", link); debug("networking: connection %p", link);
struct connection* connection = link->data; struct connection* connection = link->data;
if (connection->state != OPENED)
pthread_mutex_lock(&(connection->lock));
if (connection->state != OPENED) {
pthread_mutex_unlock(&(connection->lock));
continue; continue;
}
connection->inUse++; connection->inUse++;
pthread_mutex_unlock(&(connection->lock));
int tmp; int tmp;
char c; char c;
char buffer[BUFFER_LENGTH]; char buffer[BUFFER_LENGTH];
@ -480,7 +524,11 @@ void dataHandler(int signo) {
connection->currentHeader = NULL; connection->currentHeader = NULL;
debug("networking: headers complete"); debug("networking: headers complete");
pthread_mutex_lock(&(connection->lock));
connection->state = PROCESSING; connection->state = PROCESSING;
pthread_mutex_unlock(&(connection->lock));
updateTiming(connection, true); updateTiming(connection, true);
startRequestHandler(connection); startRequestHandler(connection);
break; break;
@ -549,10 +597,15 @@ void dataHandler(int signo) {
debug("networking: dropping connection"); debug("networking: dropping connection");
setSIGIO(connection->readfd, false); setSIGIO(connection->readfd, false);
pthread_mutex_lock(&(connection->lock));
connection->state = ABORTED; connection->state = ABORTED;
pthread_mutex_unlock(&(connection->lock));
} }
pthread_mutex_lock(&(connection->lock));
connection->inUse--; connection->inUse--;
pthread_mutex_unlock(&(connection->lock));
} }
} }
void* dataThread(void* ignore) { void* dataThread(void* ignore) {
@ -746,6 +799,7 @@ void* listenThread(void* _bind) {
setSIGIO(tmp, true); setSIGIO(tmp, true);
#endif #endif
// lock doesn't yet exist
connection->state = OPENED; connection->state = OPENED;
connection->peer = peer; connection->peer = peer;
connection->bind = bindObj; connection->bind = bindObj;
@ -773,6 +827,7 @@ void* listenThread(void* _bind) {
connection->currentHeaderLength = 0; connection->currentHeaderLength = 0;
connection->currentHeader = NULL; connection->currentHeader = NULL;
connection->inUse = 0; connection->inUse = 0;
pthread_mutex_init(&connection->lock, NULL);
updateTiming(connection, false); updateTiming(connection, false);
linked_push(&connectionList, connection); linked_push(&connectionList, connection);

View file

@ -47,6 +47,7 @@ struct connection {
enum connectionState state; enum connectionState state;
struct peer peer; struct peer peer;
struct bind* bind; struct bind* bind;
pthread_mutex_t lock;
volatile sig_atomic_t inUse; volatile sig_atomic_t inUse;
int readfd; int readfd;
int writefd; int writefd;