APPEND/PREPEND commands
[rxpd] / rxpd.c
diff --git a/rxpd.c b/rxpd.c
index 578099e..2fe7b77 100644 (file)
--- a/rxpd.c
+++ b/rxpd.c
@@ -21,7 +21,7 @@
 
 #include "rxpd.h"
 
-static struct rxpd_base global_base = {NULL};
+static struct rxpd_base global_base;
 
 struct rxpd_base*
 rxpd_init (char* rulesdir)
@@ -85,27 +85,32 @@ rxpd_rule_new (const char* buf)
       if (*buf != '#')
         {
           int err;
-          char* rxstart = strchr (buf, ':') + 1;
+          char* rxstart = strchr (buf, ':');
 
-          err = regcomp (&self->rx, rxstart, REG_EXTENDED|REG_ICASE|REG_NOSUB);
-
-          if (!err)
-            {
-              self->string = strdup (buf);
-              if (!self->string) abort();
-            }
+          if (!rxstart)
+            self->string = strdup ("#ERROR: Syntax error, line was neither a comment nor a rule");
           else
             {
-              regfree (&self->rx);
-              char ebuf[256];
-              size_t len = regerror (err, NULL, ebuf, 256);
-              self->string = malloc(len + strlen(buf) + 9);
-              if (!self->string) abort();
-              strcpy (self->string, "# ");
-              strcat (self->string, ebuf);
-              strcat (self->string, " in '");
-              strcat (self->string, buf);
-              strcat (self->string, "'");
+              err = regcomp (&self->rx, rxstart+1, REG_EXTENDED|REG_ICASE|REG_NOSUB);
+
+              if (!err)
+                {
+                  self->string = strdup (buf);
+                  if (!self->string) abort();
+                }
+              else
+                {
+                  regfree (&self->rx);
+                  char ebuf[256];
+                  size_t len = regerror (err, NULL, ebuf, 256);
+                  self->string = malloc(len + strlen(buf) + 14);
+                  if (!self->string) abort();
+                  strcpy (self->string, "#ERROR: ");
+                  strcat (self->string, ebuf);
+                  strcat (self->string, " in '");
+                  strcat (self->string, buf);
+                  strcat (self->string, "'");
+                }
             }
         }
       else
@@ -133,54 +138,39 @@ rxpd_rule_delete (struct rxpd_rule* rule)
 //
 
 
-int
-rxpd_file_load (struct rxpd_base* base, const char* filename)
+struct rxpd_file*
+rxpd_file_new (struct rxpd_base* base, const char* filename)
 {
-  char buf[2048];
+  char buf[4096];
+  struct rxpd_file* self = NULL;
 
   // TODO better filenname validation / error handling
   if (!filename ||
       strchr (filename, '/') ||
-      strlen (filename) + strlen (base->rulesdir) > 2047)
-    abort();
+      strlen (filename) + strlen (base->rulesdir) > 4097)
+    return NULL;
 
   strcpy (buf, base->rulesdir);
   strcat (buf, filename);
   filename = strdup (buf);
-
-  struct rxpd_file* file = malloc (sizeof (struct rxpd_file));
-  if (!file || !filename) abort();
-
-  psplay_init (&file->node, filename);
-  llist_init (&file->rules);
-  
-  FILE* f = fopen (filename, "r");
-  // TODO error handling
-  if (!f) abort();
-
-  // TODO test excess line length = error
-  while (fgets (buf, 2048, f))
+  if (filename)
     {
-      size_t last = strlen(buf);
-      if (buf[last-1] == '\n')
-        buf[last-1] = '\0';
-
-      struct rxpd_rule* rule;
-      rule = rxpd_rule_new (buf);
-      if (!rule)
-        abort();
-
-
-      printf("%s\n", rule->string);
+      self = malloc (sizeof (struct rxpd_file));
+      if (self)
+        {
+          self->filename = filename;
+          const char* basename = strrchr (filename, '/');
+          if (basename)
+            ++basename;
+          else
+            basename = filename;
+          psplay_init (&self->node, basename);
+          llist_init (&self->rules);
 
-      llist_insert_tail (&file->rules, &rule->node);
+          psplay_insert (&base->files, &self->node);
+        }
     }
-
-  fclose (f);
-
-  psplay_insert (&base->files, &file->node);
-
-  return 0;
+  return self;
 }
 
 void
@@ -194,11 +184,44 @@ rxpd_file_delete (PSplay f)
           struct rxpd_rule* node = (struct rxpd_rule*)n;
           rxpd_rule_delete (node);
         }
-      free ((void*)file->node.key);
+      free ((void*)file->filename);
       free (f);
     }
 }
 
+int
+rxpd_file_load (struct rxpd_file* self)
+{
+  FILE* f = fopen (self->filename, "r");
+  // TODO error handling
+  if (f)
+    {
+      // TODO test excess line length = error
+      char buf[4096];
+
+      while (fgets (buf, 4096, f))
+        {
+          size_t last = strlen(buf);
+          if (buf[last-1] == '\n')
+            buf[last-1] = '\0';
+
+          struct rxpd_rule* rule;
+          rule = rxpd_rule_new (buf);
+          if (!rule)
+            abort();
+
+          printf("loaded rule '%s'\n", rule->string);
+
+          llist_insert_tail (&self->rules, &rule->node);
+        }
+
+      fclose (f);
+      return 1;
+    }
+  else
+    return 0;
+}
+
 int
 rxpd_file_cmp (const void* A, const void* B)
 {
@@ -227,14 +250,20 @@ rxpd_socket_new_tcp4 (struct rxpd_base* base, const char* addr, unsigned short p
   memset (&listen_addr, 0, sizeof (listen_addr));
 
   listen_addr.sin_family = AF_INET;
-  if (inet_aton (addr?addr:"0.0.0.0", &listen_addr.sin_addr) == 0)
-    abort();
+  if (addr)
+    {
+      if (inet_aton (addr, &listen_addr.sin_addr) == 0)
+        abort();
+    }
+  else
+    listen_addr.sin_addr.s_addr = INADDR_ANY;
+
   listen_addr.sin_port = htons(port);
 
   if (bind (self->fd, (struct sockaddr*)&listen_addr, sizeof (listen_addr)) == -1)
     abort();
 
-  static int yes=1;
+  static int yes = 1;
   if (setsockopt (self->fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1)
     abort ();
 
@@ -260,11 +289,11 @@ rxpd_socket_delete (struct rxpd_socket* self)
 }
 
 struct rxpd_socket*
-rxpd_socket_activate (struct rxpd_socket* self)
+rxpd_socket_schedule (struct rxpd_socket* self)
 {
   if (self)
     {
-      llist_insert_tail (&self->base->sockets_active, &self->node);
+      llist_insert_head (&self->base->sockets_active, &self->node);
       event_add (&self->ev, NULL);
     }
   return self;
@@ -286,14 +315,116 @@ rxpd_socket_accept (int fd, short event, void* ptr)
 {
   printf ("incoming connection\n");
 
-  struct rxpd_socket* sock = ptr;
+  struct rxpd_socket* self = ptr;
 
   struct rxpd_connection* conn =
-    rxpd_connection_new (sock->base, fd);
+    rxpd_connection_new (self->base, fd);
 
-  rxpd_connection_activate (conn);
+  rxpd_connection_schedule (conn);
+  rxpd_socket_schedule (self);
 }
 
+///
+
+struct rxpd_buffer*
+rxpd_buffer_init (struct rxpd_buffer* self, struct rxpd_connection* conn)
+{
+  self->conn = conn;
+  self->state = RXPD_OK;
+  self->eol = self->eob = self->buffer;
+  self->buffer [4095] = '\0';
+  return self;
+}
+
+
+char*
+rxpd_buffer_readline (struct rxpd_buffer* self, int again)
+{
+  int fd = self->conn->fd;
+
+  if (self->eol < self->eob)
+    {
+      //there was a line pending, shift buffer left
+      memmove (self->buffer, self->eol+1, self->eob - self->eol - 1);
+      self->eob = (char*)(self->eob - (self->eol - self->buffer + 1));
+      self->eol = self->buffer;
+      // TODO handle \r's
+    }
+
+
+  if (!again && self->state == RXPD_OK)   // we only read when again is 0, first iteration
+    {
+      ssize_t r = 0;
+      do
+        {
+          r = read(fd, self->eob, 4095 - (self->eob - self->buffer));
+        }
+      while (r == -1 && errno == EINTR);
+
+      if (r != -1)
+        {
+
+          if (r == 0)
+            {
+              shutdown (fd, SHUT_RD);
+              self->state = RXPD_EOF;
+            }
+
+          self->eob += r;
+        }
+      else
+        self->state = RXPD_ERROR;
+    }
+
+  // find next newline, terminate string there
+  for (char* i = self->buffer; i < self->eob; ++i)
+    {
+      if (*i == '\n')
+        {
+          *i = '\0';
+          self->eol = i;
+          break;
+        }
+    }
+
+  // TODO handle buffer overfulls
+
+  return (self->eob == self->buffer) ? NULL : self->buffer;
+}
+
+/*
+void
+rxpd_buffer_write(int fd, short event, void* ptr)
+{
+  struct rxpd_buffer* self = (struct rxpd_buffer*) ptr;
+
+  ssize_t n = write(int fd, const void *buf, size_t count);
+
+}
+*/
+
+int
+rxpd_buffer_printf (struct rxpd_buffer* self, const char* fmt, ...)
+{
+  // for now we do a blocking write, needs to be fixed some day
+  // add string to buffer
+  va_list ap;
+  va_start(ap, fmt);
+  //int sz = self->buffer+4096 - self->eob;
+  int n = vsnprintf (self->buffer, 4096, fmt, ap);
+  va_end(ap);
+
+  write (self->conn->fd, self->buffer, n);
+
+  if (n>4095)
+    return 0;
+
+  return 1;
+}
+
+
+
+
 ///
 
 struct rxpd_connection*
@@ -310,10 +441,15 @@ rxpd_connection_new (struct rxpd_base* base, int fd)
   if (self->fd == -1)
     abort ();
 
+  static int yes = 1;
+  if (setsockopt (self->fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1)
+    abort ();
+
   self->base = base;
   self->file = NULL;
 
-  self->eol = self->eob = self->buffer;
+  rxpd_buffer_init (&self->in, self);
+  rxpd_buffer_init (&self->out, self);
   
   event_set (&self->ev, self->fd, EV_READ, rxpd_connection_parse_cmd, self);
 
@@ -335,7 +471,7 @@ rxpd_connection_delete (struct rxpd_connection* self)
 }
 
 struct rxpd_connection*
-rxpd_connection_activate (struct rxpd_connection* self)
+rxpd_connection_schedule (struct rxpd_connection* self)
 {
   if (self)
     {
@@ -356,69 +492,277 @@ rxpd_connection_suspend (struct rxpd_connection* self)
   return self;
 }
 
+void
+rxpd_connection_parse_cmd (int fd, short event, void* ptr)
+{
+  printf ("parse cmd\n");
+  struct rxpd_connection* self = (struct rxpd_connection*) ptr;
+
+  char* line;
+  line = rxpd_buffer_readline (&self->in, 0);
 
+  if (!line)
+    {
+      rxpd_buffer_printf (&self->out, "#ERROR: no data\n");
+      close (fd);
+      return;
+    }
 
+  static const struct cmd_table
+  {
+    enum rxpd_cmd_e nr;
+    const char* cmd;
+    size_t sz;
+  } cmds[] =
+    {
+#define RXPD_CMD(cmd) {RXPD_CMD_##cmd, #cmd":", sizeof (#cmd)},
+      RXPD_COMMANDS
+#undef RXPD_CMD
+      {0, NULL, 0}
+    };
+
+  const struct cmd_table* i;
+  for (i = cmds; i->cmd; ++i)
+    if (strncmp (line, i->cmd, i->sz) == 0)
+      break;
+  if (!i->cmd)
+    {
+      rxpd_buffer_printf (&self->out, "#ERROR: no command\n");
+      rxpd_connection_delete (self);
+      return;
+    }
+  // TODO policy check here
 
-int
-rxpd_connection_readline (struct rxpd_connection* self)
-{
+  if (line[i->sz])
+    {
+      // rulename provided
+      self->file = (struct rxpd_file*) psplay_find (&self->base->files, &line[i->sz]);
+
+      if (!self->file)
+        {
+          // todo create policy?
+          self->file = rxpd_file_new (self->base, &line[i->sz]);
+          if (!self->file)
+            {
+              rxpd_buffer_printf (&self->out, "#ERROR: illegal rule\n");
+              rxpd_connection_delete (self);
+              return;
+            }
+        }
+    }
+
+  // dispatch
+  switch (i->nr)
+    {
+#define RXPD_CMD(cmd)                                                           \
+case RXPD_CMD_##cmd:                                                            \
+  event_set (&self->ev, self->fd, EV_READ, rxpd_connection_cmd_##cmd, self);    \
+  rxpd_connection_cmd_##cmd (fd, 0, ptr);                                       \
+  break;
+      RXPD_COMMANDS
+#undef RXPD_CMD
+    }
 }
 
+
+
 void
-rxpd_connection_parse_cmd (int sock, short event, void* ptr)
+rxpd_connection_cmd_CHECK (int fd, short event, void* ptr)
 {
-  printf ("parse cmd\n");
-  // TODO policy check
+  struct rxpd_connection* self = (struct rxpd_connection*) ptr;
+
+  if (event == EV_READ)
+    {
+      int again = -1;
+      char* line;
+      while (line = rxpd_buffer_readline (&self->in, ++again))
+        {
+          if (*line == '\0')
+            {
+              rxpd_buffer_printf (&self->out, "#OK:\n");
+            }
+          else
+            {
+              LLIST_FOREACH (&self->file->rules, n)
+                {
+                  struct rxpd_rule* rule = (struct rxpd_rule*)n;
+                  if (rule->string[0] != '#')
+                    {
+                      if (regexec (&rule->rx, line, 0, NULL, 0) == 0)
+                        {
+                          rxpd_buffer_printf (&self->out, "%s\n", rule->string);
+                          break;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+  if (rxpd_buffer_state (&self->in) == RXPD_OK)
+    rxpd_connection_schedule (self);
+  else
+    {
+      if (rxpd_buffer_state (&self->in) == RXPD_ERROR)
+        rxpd_buffer_printf (&self->out, "#ERROR:\n");
+      close (fd);
+    }
 }
 
-void
-rxpd_connection_cmd_CHECK (int sock, short event, void* ptr)
+
+
+static void
+rxpd_connection_APPEND_PREPEND_helper (int fd, short event, void* ptr, int do_append)
 {
+  struct rxpd_connection* self = (struct rxpd_connection*) ptr;
+
+  if (!event)
+    llist_init (&self->tmp_list);
+
+  if (event == EV_READ)
+    {
+      int again = -1;
+      char* line;
+
+      while (line = rxpd_buffer_readline (&self->in, ++again))
+        {
+          if (*line)
+            {
+              struct rxpd_rule* rule;
+              rule = rxpd_rule_new (line);
+              if (!rule)
+                abort();
+
+              llist_insert_tail (&self->tmp_list, &rule->node);
+            }
+          else goto finish;
+        }
+    }
+
+  if (rxpd_buffer_state (&self->in) == RXPD_OK)
+    rxpd_connection_schedule (self);
+  else
+    {
+      // TODO should also print error when any rule compilation failed
+      if (rxpd_buffer_state (&self->in) == RXPD_ERROR)
+        rxpd_buffer_printf (&self->out, "#ERROR:\n");
+      else
+        {
+        finish:
+          rxpd_buffer_printf (&self->out, "#OK:\n");
+        }
+
+      if (do_append)
+        llist_insertlist_prev (&self->file->rules, &self->tmp_list);
+      else
+        llist_insertlist_next (&self->file->rules, &self->tmp_list);
+      close (fd);
+    }
 }
 
 void
-rxpd_connection_cmd_APPEND (int sock, short event, void* ptr)
+rxpd_connection_cmd_APPEND (int fd, short event, void* ptr)
 {
+  rxpd_connection_APPEND_PREPEND_helper (fd, event, ptr, 1);
 }
 
 void
-rxpd_connection_cmd_PREPEND (int sock, short event, void* ptr)
+rxpd_connection_cmd_PREPEND (int fd, short event, void* ptr)
 {
+  rxpd_connection_APPEND_PREPEND_helper (fd, event, ptr, 0);
 }
 
 void
-rxpd_connection_cmd_REMOVE (int sock, short event, void* ptr)
+rxpd_connection_cmd_REMOVE (int fd, short event, void* ptr)
 {
+  struct rxpd_connection* self = (struct rxpd_connection*) ptr;
+  rxpd_buffer_printf (&self->out, "#ERROR: unimplemented command %s\n", &__func__[20]);
 }
 
 void
-rxpd_connection_cmd_REPLACE (int sock, short event, void* ptr)
+rxpd_connection_cmd_REPLACE (int fd, short event, void* ptr)
 {
+  struct rxpd_connection* self = (struct rxpd_connection*) ptr;
+  rxpd_buffer_printf (&self->out, "#ERROR: unimplemented command %s\n", &__func__[20]);
 }
 
 void
-rxpd_connection_cmd_LOAD (int sock, short event, void* ptr)
+rxpd_connection_cmd_LOAD (int fd, short event, void* ptr)
 {
+  struct rxpd_connection* self = (struct rxpd_connection*) ptr;
+  rxpd_buffer_printf (&self->out, "#ERROR: unimplemented command %s\n", &__func__[20]);
 }
 
 void
-rxpd_connection_cmd_SAVE (int sock, short event, void* ptr)
+rxpd_connection_cmd_SAVE (int fd, short event, void* ptr)
 {
+  struct rxpd_connection* self = (struct rxpd_connection*) ptr;
+  rxpd_buffer_printf (&self->out, "#ERROR: unimplemented command %s\n", &__func__[20]);
 }
 
 void
-rxpd_connection_cmd_DUMP (int sock, short event, void* ptr)
+rxpd_connection_cmd_DUMP (int fd, short event, void* ptr)
 {
+  struct rxpd_connection* self = (struct rxpd_connection*) ptr;
+
+  if (llist_is_empty (&self->file->rules))
+    rxpd_buffer_printf (&self->out, "#OK:\n");
+  else
+    {
+      LLIST_FOREACH (&self->file->rules, n)
+        {
+          struct rxpd_rule* rule = (struct rxpd_rule*)n;
+          rxpd_buffer_printf (&self->out, "%s\n", rule->string);
+        }
+    }
+
+  close (fd);
 }
 
-void
-rxpd_connection_cmd_LIST (int sock, short event, void* ptr)
+
+static psplay_delete_t
+walk_LIST (PSplay node, const enum psplay_order_e which, int level, void* data)
 {
+  (void) level;
+  struct rxpd_file* file = (struct rxpd_file*) node;
+  struct rxpd_connection* conn = (struct rxpd_connection*) data;
+
+  if (which == PSPLAY_INORDER)
+    rxpd_buffer_printf (&conn->out, "%s\n", file->node.key);
+
+  return PSPLAY_CONT;
 }
 
+
 void
-rxpd_connection_cmd_END (int sock, short event, void* ptr)
+rxpd_connection_cmd_LIST (int fd, short event, void* ptr)
 {
+  struct rxpd_connection* self = (struct rxpd_connection*) ptr;
+
+  if (psplay_isempty_root (&self->base->files))
+    rxpd_buffer_printf (&self->out, "#OK:\n");
+  else
+    psplay_walk (&self->base->files, NULL, walk_LIST, 0, ptr);
+
+  close (fd);
 }
 
+void
+rxpd_connection_cmd_SHUTDOWN (int fd, short event, void* ptr)
+{
+  struct rxpd_connection* self = (struct rxpd_connection*) ptr;
+  // destroy all sockets
+  LLIST_WHILE_HEAD (&self->base->sockets_pending, n)
+    {
+      struct rxpd_socket* socket = (struct rxpd_socket*)n;
+      rxpd_socket_delete (socket);
+    }
+  LLIST_WHILE_HEAD (&self->base->sockets_active, n)
+    {
+      struct rxpd_socket* socket = (struct rxpd_socket*)n;
+      rxpd_socket_delete (socket);
+    }
 
+  rxpd_buffer_printf (&self->out, "#OK:\n");
+  close (fd);
+}