WIP: gnu-pth transistion, socket accepting
authorChristian Thaeter <ct@pipapo.org>
Sun, 21 Oct 2007 17:30:58 +0000 (19:30 +0200)
committerChristian Thaeter <ct@pipapo.org>
Sun, 21 Oct 2007 17:30:58 +0000 (19:30 +0200)
Makefile.am
src/main.c
src/rxpd.h
src/rxpd_base.c
src/rxpd_connection.c
src/rxpd_socket.c

index 543dafb..79947cc 100644 (file)
@@ -21,8 +21,8 @@ rxpd_srcdir = $(top_srcdir)/src
 
 bin_PROGRAMS = rxpd
 
-rxpd_CFLAGS = $(CFLAGS) -std=gnu99 -Wall -Wextra
-rxpd_LDADD = -levent
+rxpd_CFLAGS = $(CFLAGS) -std=gnu99 -Wall -Wextra -Werror
+rxpd_LDADD = -lpth -levent
 
 rxpd_SOURCES =                                 \
        $(rxpd_srcdir)/rxpd.h                   \
index 39eba2c..74c33e4 100644 (file)
@@ -74,11 +74,14 @@ usage (void)
 int
 main (int argc, char** argv)
 {
+  if (pth_init() == FALSE)
+    rxpd_die ("pth initialization failed\n");
+
   struct rxpd_base* rxpd;
 
   openlog (PACKAGE_NAME, LOG_PID, LOG_DAEMON);
 
-  rxpd = rxpd_init (event_init ());
+  rxpd = rxpd_init ();
 
   rxpd_log (rxpd, LOG_NOTICE, PACKAGE_STRING" starting up\n");
 
@@ -201,13 +204,18 @@ main (int argc, char** argv)
   LLIST_FOREACH (&rxpd->sockets, n)
     {
       struct rxpd_socket* socket = (struct rxpd_socket*)n;
-      rxpd_socket_schedule (socket);
+      rxpd_socket_spawn (socket);
+    }
+
+  LLIST_FOREACH (&rxpd->sockets, n)
+    {
+      struct rxpd_socket* socket = (struct rxpd_socket*)n;
+      rxpd_socket_delete (rxpd_socket_join (socket));
     }
 
-  // eventloop
-  event_dispatch ();
 
   rxpd_log (rxpd, LOG_NOTICE, PACKAGE_STRING" exited\n");
   rxpd_destroy ();
+
   return EXIT_SUCCESS;
 }
index ac3e39d..ad2ff70 100644 (file)
@@ -38,6 +38,7 @@
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <event.h>
+#include <pth.h>
 #include <time.h>
 #include <netdb.h>
 
@@ -85,7 +86,6 @@ struct rxpd_base
   int daemonize;
 
   struct rxpd_file* policy;
-  struct event_base* eventbase;
 
   // TODO
   //FILE* -l log      log hits to logfile
@@ -95,7 +95,7 @@ struct rxpd_base
 };
 
 struct rxpd_base*
-rxpd_init (struct event_base* eventbase);
+rxpd_init (void);
 
 void
 rxpd_destroy (void);
@@ -167,6 +167,7 @@ struct rxpd_socket
   int fd;
   struct event ev;
   struct rxpd_base* base;
+  pth_t accepter;
   int (*rxpd_socket_addr)(struct rxpd_connection* conn, char* dst, const char* pfx, size_t size);
 };
 
@@ -184,15 +185,14 @@ rxpd_socket_tcp4addr (struct rxpd_connection* conn, char* dst, const char* pfx,
 void
 rxpd_socket_delete (struct rxpd_socket* self);
 
-void
-rxpd_socket_accept (int sock, short event, void* ptr);
+void *
+rxpd_socket_accept (void* ptr);
 
 struct rxpd_socket*
-rxpd_socket_schedule (struct rxpd_socket* self);
+rxpd_socket_spawn (struct rxpd_socket* self);
 
 struct rxpd_socket*
-rxpd_socket_suspend (struct rxpd_socket* self);
-
+rxpd_socket_join (struct rxpd_socket* self);
 
 //
 
@@ -244,7 +244,7 @@ struct rxpd_connection
 
 
 struct rxpd_connection*
-rxpd_connection_new (struct rxpd_socket* socket, int accept_fd);
+rxpd_connection_new (struct rxpd_socket* socket);
 
 void
 rxpd_connection_delete (struct rxpd_connection* self);
index 60566f0..6867734 100644 (file)
@@ -24,7 +24,7 @@
 static struct rxpd_base global_base;
 
 struct rxpd_base*
-rxpd_init (struct event_base* eventbase)
+rxpd_init (void)
 {
   if (global_base.basedir)
     return NULL;
@@ -36,12 +36,7 @@ rxpd_init (struct event_base* eventbase)
   global_base.regflags = 0;
   global_base.policy = NULL;
 
-  if (!eventbase)
-    rxpd_die ("no eventbase provided");
-
-  global_base.eventbase = eventbase;
-
-  psplay_init_root (&global_base.files, rxpd_file_cmp, rxpd_file_delete);
+  psplay_init_root (&global_base.files, rxpd_file_cmp, (psplay_delete_t)rxpd_file_delete);
   llist_init (&global_base.sockets);
 
   rxpd_log (&global_base, LOG_DEBUG, PACKAGE_NAME" initialized\n");
index 40c1526..0be7f2b 100644 (file)
 #include "rxpd.h"
 
 struct rxpd_connection*
-rxpd_connection_new (struct rxpd_socket* socket, int fd)
+rxpd_connection_new (struct rxpd_socket* socket)
 {
+  int fd = -1;
+
   struct rxpd_connection* self;
   self = rxpd_malloc (sizeof (struct rxpd_connection));
 
index add7f7b..278261c 100644 (file)
@@ -62,7 +62,7 @@ rxpd_socket_new_tcp4 (struct rxpd_base* base, const char* addr, unsigned short p
 
   self->rxpd_socket_addr = rxpd_socket_tcp4addr;
 
-  event_set (&self->ev, self->fd, EV_READ, rxpd_socket_accept, self);
+  self->accepter = NULL;
   llist_insert_tail (&base->sockets, &self->node);
 
   rxpd_log (base, LOG_INFO, "Listening on tcp4:%d\n", port);
@@ -100,35 +100,52 @@ rxpd_socket_delete (struct rxpd_socket* self)
 }
 
 struct rxpd_socket*
-rxpd_socket_schedule (struct rxpd_socket* self)
+rxpd_socket_join (struct rxpd_socket* self)
 {
-  if (self)
-    {
-      event_add (&self->ev, NULL);
-    }
+  pth_join (self->accepter, NULL);
+  self->accepter = NULL;
   return self;
 }
 
 struct rxpd_socket*
-rxpd_socket_suspend (struct rxpd_socket* self)
+rxpd_socket_spawn (struct rxpd_socket* self)
 {
+      rxpd_log (NULL, LOG_NOTICE, "socket spawn\n");
   if (self)
     {
-      event_del (&self->ev);
+      if (self->accepter)
+        rxpd_die ("socket thread already spawned\n");
+
+      self->accepter = pth_spawn (PTH_ATTR_DEFAULT, rxpd_socket_accept, self);
+
+      if (!self->accepter)
+        rxpd_die ("failed spawning thread\n");
     }
   return self;
 }
 
-void
-rxpd_socket_accept (int fd, short event, void* ptr)
+void *
+rxpd_socket_accept (void* ptr)
 {
-  (void) event;
   struct rxpd_socket* self = ptr;
 
-  struct rxpd_connection* conn =
-    rxpd_connection_new (self, fd);
+  pth_event_t ev = pth_event (PTH_EVENT_FD|PTH_UNTIL_FD_READABLE, self->fd);
+
+      rxpd_log (NULL, LOG_NOTICE, "pre ACCEPT\n");
+
+  while (pth_wait (ev))
+    {
+      rxpd_log (NULL, LOG_NOTICE, "ACCEPT\n");
+
+  //  struct rxpd_connection* conn =
+      rxpd_connection_new (self);
+    }
+
+  rxpd_log (NULL, LOG_NOTICE, "closed\n");
 
-  rxpd_connection_schedule (conn);
-  rxpd_socket_schedule (self);
+  pth_event_free (ev, PTH_FREE_ALL);
+  //rxpd_connection_schedule (conn);
+  //rxpd_socket_schedule (self);
+  return NULL;
 }