WIP: pth transition, 2nd step
authorChristian Thaeter <ct@pipapo.org>
Sun, 21 Oct 2007 20:01:33 +0000 (22:01 +0200)
committerChristian Thaeter <ct@pipapo.org>
Sun, 21 Oct 2007 20:01:33 +0000 (22:01 +0200)
src/rxpd.h
src/rxpd_connection.c
src/rxpd_socket.c

index ad2ff70..774aa27 100644 (file)
@@ -232,7 +232,7 @@ rxpd_buffer_state (struct rxpd_buffer* self)
 struct rxpd_connection
 {
   int fd;
-  struct event ev;
+  pth_t connecter;
   struct rxpd_file* file;
   struct rxpd_socket* socket;
   char* tmp_str;
@@ -250,7 +250,7 @@ void
 rxpd_connection_delete (struct rxpd_connection* self);
 
 struct rxpd_connection*
-rxpd_connection_schedule (struct rxpd_connection* self);
+rxpd_connection_spawn (struct rxpd_connection* self);
 
 int
 rxpd_connection_readline (struct rxpd_connection* self);
@@ -258,8 +258,8 @@ rxpd_connection_readline (struct rxpd_connection* self);
 int
 rxpd_connection_check_policy (struct rxpd_connection* self, char* line);
 
-void
-rxpd_connection_parse_cmd (int fd, short event, void* ptr);
+void*
+rxpd_connection_parse_cmd (void* ptr);
 
 /* generate prototypes for each defined command */
 #define RXPD_CMD(cmd, _) void rxpd_connection_cmd_##cmd (int fd, short event, void* ptr);
index 0be7f2b..bf06dbd 100644 (file)
 struct rxpd_connection*
 rxpd_connection_new (struct rxpd_socket* socket)
 {
-  int fd = -1;
-
   struct rxpd_connection* self;
   self = rxpd_malloc (sizeof (struct rxpd_connection));
 
-  self->fd = accept (fd, NULL, 0);
+  self->fd = accept (socket->fd, NULL, 0);
   if (self->fd == -1)
     abort ();
 
@@ -41,7 +39,7 @@ rxpd_connection_new (struct rxpd_socket* socket)
   rxpd_buffer_init (&self->in, self);
   rxpd_buffer_init (&self->out, self);
   
-  event_set (&self->ev, self->fd, EV_READ, rxpd_connection_parse_cmd, self);
+  self->connecter = NULL;
 
   // TODO more info
   rxpd_log (socket->base, LOG_INFO, "incoming connection\n");
@@ -53,7 +51,8 @@ rxpd_connection_delete (struct rxpd_connection* self)
 {
   if (self)
     {
-      event_del (&self->ev);
+      // TODO kill connecter if not self
+
       close (self->fd);
       free (self->tmp_str);
       LLIST_WHILE_HEAD (&self->tmp_list, n)
@@ -66,11 +65,21 @@ rxpd_connection_delete (struct rxpd_connection* self)
 }
 
 struct rxpd_connection*
-rxpd_connection_schedule (struct rxpd_connection* self)
+rxpd_connection_spawn (struct rxpd_connection* self)
 {
   if (self)
     {
-      event_add (&self->ev, NULL);
+      if (self->connecter)
+        rxpd_die ("connection thread already spawned\n");
+
+      pth_attr_t attr = pth_attr_new ();
+
+      pth_attr_set (attr, PTH_ATTR_JOINABLE, FALSE);
+
+      self->connecter = pth_spawn (attr, rxpd_connection_parse_cmd, self);
+
+      if (!self->connecter)
+        rxpd_die ("failed spawning thread\n");
     }
   return self;
 }
@@ -115,11 +124,9 @@ rxpd_connection_check_policy (struct rxpd_connection* self, char* line)
   return 1;
 }
 
-void
-rxpd_connection_parse_cmd (int fd, short event, void* ptr)
+void*
+rxpd_connection_parse_cmd (void* ptr)
 {
-  (void) event;
-
   struct rxpd_connection* self = (struct rxpd_connection*) ptr;
   struct rxpd_base* base = self->socket->base;
 
@@ -130,8 +137,8 @@ rxpd_connection_parse_cmd (int fd, short event, void* ptr)
     {
       rxpd_log (base, LOG_ERR, "no data\n");
       rxpd_buffer_printf (&self->out, "#ERROR: no data\n");
-      close (fd);
-      return;
+      close (self->fd);
+      return NULL;
     }
 
   rxpd_log (base, LOG_DEBUG, "parse command '%s'\n", line);
@@ -158,14 +165,14 @@ rxpd_connection_parse_cmd (int fd, short event, void* ptr)
       rxpd_log (base, LOG_ERR, "no command\n");
       rxpd_buffer_printf (&self->out, "#ERROR: no command\n");
       rxpd_connection_delete (self);
-      return;
+      return NULL;
     }
 
   if (!rxpd_connection_check_policy (self, line))
     {
       rxpd_buffer_printf (&self->out, "#ERROR: access denied\n");
       rxpd_connection_delete (self);
-      return;
+      return NULL;
     }
 
   if (line[i->sz])
@@ -181,7 +188,7 @@ rxpd_connection_parse_cmd (int fd, short event, void* ptr)
               rxpd_log (base, LOG_ERR, "illeagal filename\n");
               rxpd_buffer_printf (&self->out, "#ERROR: illegal filename\n");
               rxpd_connection_delete (self);
-              return;
+              return NULL;
             }
         }
     }
@@ -190,12 +197,16 @@ rxpd_connection_parse_cmd (int fd, short event, void* ptr)
   switch (i->nr)
     {
 #define RXPD_CMD(cmd, _)                                                        \
-case RXPD_CMD_##cmd:                                                            \
-  event_set (&self->ev, self->fd, EV_READ, rxpd_connection_cmd_##cmd, self);    \
-  rxpd_connection_cmd_##cmd (fd, 0, ptr);                                       \
+case RXPD_CMD_##cmd:                                                            
+
+
+  //event_set (&self->ev, self->fd, EV_READ, rxpd_connection_cmd_##cmd, self);    
+  //rxpd_connection_cmd_##cmd (fd, 0, ptr);                                       
   break;
       RXPD_COMMANDS
 #undef RXPD_CMD
+        ;
     }
+  return NULL;
 }
 
index 278261c..263e9ee 100644 (file)
@@ -131,21 +131,20 @@ rxpd_socket_accept (void* ptr)
 
   pth_event_t ev = pth_event (PTH_EVENT_FD|PTH_UNTIL_FD_READABLE, self->fd);
 
-      rxpd_log (NULL, LOG_NOTICE, "pre ACCEPT\n");
+  rxpd_log (NULL, LOG_NOTICE, "pre ACCEPT\n");
 
+  // TODO cancel thread to leave the loop?
   while (pth_wait (ev))
     {
       rxpd_log (NULL, LOG_NOTICE, "ACCEPT\n");
 
-  //  struct rxpd_connection* conn =
-      rxpd_connection_new (self);
+      struct rxpd_connection* conn =
+        rxpd_connection_new (self);
+      rxpd_connection_spawn (conn);
     }
 
   rxpd_log (NULL, LOG_NOTICE, "closed\n");
 
-  pth_event_free (ev, PTH_FREE_ALL);
-  //rxpd_connection_schedule (conn);
-  //rxpd_socket_schedule (self);
   return NULL;
 }