struct rxpd_connection
{
int fd;
- struct event ev;
+ pth_t connecter;
struct rxpd_file* file;
struct rxpd_socket* socket;
char* tmp_str;
rxpd_connection_delete (struct rxpd_connection* self);
struct rxpd_connection*
-rxpd_connection_schedule (struct rxpd_connection* self);
+rxpd_connection_spawn (struct rxpd_connection* self);
int
rxpd_connection_readline (struct rxpd_connection* self);
int
rxpd_connection_check_policy (struct rxpd_connection* self, char* line);
-void
-rxpd_connection_parse_cmd (int fd, short event, void* ptr);
+void*
+rxpd_connection_parse_cmd (void* ptr);
/* generate prototypes for each defined command */
#define RXPD_CMD(cmd, _) void rxpd_connection_cmd_##cmd (int fd, short event, void* ptr);
struct rxpd_connection*
rxpd_connection_new (struct rxpd_socket* socket)
{
- int fd = -1;
-
struct rxpd_connection* self;
self = rxpd_malloc (sizeof (struct rxpd_connection));
- self->fd = accept (fd, NULL, 0);
+ self->fd = accept (socket->fd, NULL, 0);
if (self->fd == -1)
abort ();
rxpd_buffer_init (&self->in, self);
rxpd_buffer_init (&self->out, self);
- event_set (&self->ev, self->fd, EV_READ, rxpd_connection_parse_cmd, self);
+ self->connecter = NULL;
// TODO more info
rxpd_log (socket->base, LOG_INFO, "incoming connection\n");
{
if (self)
{
- event_del (&self->ev);
+ // TODO kill connecter if not self
+
close (self->fd);
free (self->tmp_str);
LLIST_WHILE_HEAD (&self->tmp_list, n)
}
struct rxpd_connection*
-rxpd_connection_schedule (struct rxpd_connection* self)
+rxpd_connection_spawn (struct rxpd_connection* self)
{
if (self)
{
- event_add (&self->ev, NULL);
+ if (self->connecter)
+ rxpd_die ("connection thread already spawned\n");
+
+ pth_attr_t attr = pth_attr_new ();
+
+ pth_attr_set (attr, PTH_ATTR_JOINABLE, FALSE);
+
+ self->connecter = pth_spawn (attr, rxpd_connection_parse_cmd, self);
+
+ if (!self->connecter)
+ rxpd_die ("failed spawning thread\n");
}
return self;
}
return 1;
}
-void
-rxpd_connection_parse_cmd (int fd, short event, void* ptr)
+void*
+rxpd_connection_parse_cmd (void* ptr)
{
- (void) event;
-
struct rxpd_connection* self = (struct rxpd_connection*) ptr;
struct rxpd_base* base = self->socket->base;
{
rxpd_log (base, LOG_ERR, "no data\n");
rxpd_buffer_printf (&self->out, "#ERROR: no data\n");
- close (fd);
- return;
+ close (self->fd);
+ return NULL;
}
rxpd_log (base, LOG_DEBUG, "parse command '%s'\n", line);
rxpd_log (base, LOG_ERR, "no command\n");
rxpd_buffer_printf (&self->out, "#ERROR: no command\n");
rxpd_connection_delete (self);
- return;
+ return NULL;
}
if (!rxpd_connection_check_policy (self, line))
{
rxpd_buffer_printf (&self->out, "#ERROR: access denied\n");
rxpd_connection_delete (self);
- return;
+ return NULL;
}
if (line[i->sz])
rxpd_log (base, LOG_ERR, "illeagal filename\n");
rxpd_buffer_printf (&self->out, "#ERROR: illegal filename\n");
rxpd_connection_delete (self);
- return;
+ return NULL;
}
}
}
switch (i->nr)
{
#define RXPD_CMD(cmd, _) \
-case RXPD_CMD_##cmd: \
- event_set (&self->ev, self->fd, EV_READ, rxpd_connection_cmd_##cmd, self); \
- rxpd_connection_cmd_##cmd (fd, 0, ptr); \
+case RXPD_CMD_##cmd:
+
+
+ //event_set (&self->ev, self->fd, EV_READ, rxpd_connection_cmd_##cmd, self);
+ //rxpd_connection_cmd_##cmd (fd, 0, ptr);
break;
RXPD_COMMANDS
#undef RXPD_CMD
+ ;
}
+ return NULL;
}
pth_event_t ev = pth_event (PTH_EVENT_FD|PTH_UNTIL_FD_READABLE, self->fd);
- rxpd_log (NULL, LOG_NOTICE, "pre ACCEPT\n");
+ rxpd_log (NULL, LOG_NOTICE, "pre ACCEPT\n");
+ // TODO cancel thread to leave the loop?
while (pth_wait (ev))
{
rxpd_log (NULL, LOG_NOTICE, "ACCEPT\n");
- // struct rxpd_connection* conn =
- rxpd_connection_new (self);
+ struct rxpd_connection* conn =
+ rxpd_connection_new (self);
+ rxpd_connection_spawn (conn);
}
rxpd_log (NULL, LOG_NOTICE, "closed\n");
- pth_event_free (ev, PTH_FREE_ALL);
- //rxpd_connection_schedule (conn);
- //rxpd_socket_schedule (self);
return NULL;
}