From 7f7330301157778112c5ce3569edf7ef1a625e63 Mon Sep 17 00:00:00 2001 From: Ralph Amissah Date: Tue, 13 Oct 2020 13:40:10 -0400 Subject: cgi.d arsd update --- misc/ext_lib/src/arsd/README | 1 + misc/ext_lib/src/arsd/cgi.d | 1806 ++++++++++++++++++++++++++++-------------- 2 files changed, 1205 insertions(+), 602 deletions(-) create mode 100644 misc/ext_lib/src/arsd/README diff --git a/misc/ext_lib/src/arsd/README b/misc/ext_lib/src/arsd/README new file mode 100644 index 0000000..c52641e --- /dev/null +++ b/misc/ext_lib/src/arsd/README @@ -0,0 +1 @@ +aria2c https://raw.githubusercontent.com/adamdruppe/arsd/master/cgi.d diff --git a/misc/ext_lib/src/arsd/cgi.d b/misc/ext_lib/src/arsd/cgi.d index caee996..1afd32b 100644 --- a/misc/ext_lib/src/arsd/cgi.d +++ b/misc/ext_lib/src/arsd/cgi.d @@ -1,6 +1,8 @@ // FIXME: if an exception is thrown, we shouldn't necessarily cache... // FIXME: there's some annoying duplication of code in the various versioned mains +// add the Range header in there too. should return 206 + // FIXME: cgi per-request arena allocator // i need to add a bunch of type templates for validations... mayne @NotNull or NotNull! @@ -358,6 +360,26 @@ version(Posix) { } } +void cloexec(int fd) { + version(Posix) { + import core.sys.posix.fcntl; + fcntl(fd, F_SETFD, FD_CLOEXEC); + } +} + +void cloexec(Socket s) { + version(Posix) { + import core.sys.posix.fcntl; + fcntl(s.handle, F_SETFD, FD_CLOEXEC); + } +} + +version(embedded_httpd_hybrid) { + version=embedded_httpd_threads; + version=cgi_use_fork; + version=cgi_use_fiber; +} + // the servers must know about the connections to talk to them; the interfaces are vital version(with_addon_servers) version=with_addon_servers_connections; @@ -1248,6 +1270,8 @@ class Cgi { // multipart/form-data + // FIXME: this might want to be factored out and factorized + // need to make sure the stream hooks actually work. void pieceHasNewContent() { // we just grew the piece's buffer. Do we have to switch to file backing? if(pps.piece.contentInMemory) { @@ -1597,14 +1621,22 @@ class Cgi { } auto ira = ir.source.remoteAddress(); + auto irLocalAddress = ir.source.localAddress(); + + ushort port = 80; + if(auto ia = cast(InternetAddress) irLocalAddress) { + port = ia.port; + } else if(auto ia = cast(Internet6Address) irLocalAddress) { + port = ia.port; + } // that check for UnixAddress is to work around a Phobos bug // see: https://github.com/dlang/phobos/pull/7383 // but this might be more useful anyway tbh for this case version(Posix) - this(ir, cast(UnixAddress) ira ? "unix:" : ira.toString(), 80 /* FIXME */, 0, false, &rdo, null, closeConnection); + this(ir, cast(UnixAddress) ira ? "unix:" : ira.toString(), port, 0, false, &rdo, null, closeConnection); else - this(ir, ira.toString(), 80 /* FIXME */, 0, false, &rdo, null, closeConnection); + this(ir, ira.toString(), port, 0, false, &rdo, null, closeConnection); } /** @@ -1945,11 +1977,17 @@ class Cgi { string getCurrentCompleteUri() const { ushort defaultPort = https ? 443 : 80; - return format("http%s://%s%s%s", - https ? "s" : "", - host, - (!port || port == defaultPort) ? "" : ":" ~ to!string(port), - requestUri); + string uri = "http"; + if(https) + uri ~= "s"; + uri ~= "://"; + uri ~= host; + if(!(!port || port == defaultPort)) { + uri ~= ":"; + uri ~= to!string(port); + } + uri ~= requestUri; + return uri; } /// You can override this if your site base url isn't the same as the script name @@ -2098,110 +2136,114 @@ class Cgi { private bool websocketMode; void flushHeaders(const(void)[] t, bool isAll = false) { - string[] hd; - // Flush the headers + StackBuffer buffer = StackBuffer(0); + + prepHeaders(t, isAll, &buffer); + + if(rawDataOutput !is null) + rawDataOutput(cast(const(ubyte)[]) buffer.get()); + else { + stdout.rawWrite(buffer.get()); + } + } + + private void prepHeaders(const(void)[] t, bool isAll, StackBuffer* buffer) { + string terminator = "\n"; + if(rawDataOutput !is null) + terminator = "\r\n"; + if(responseStatus !is null) { if(nph) { if(http10) - hd ~= "HTTP/1.0 " ~ responseStatus; + buffer.add("HTTP/1.0 ", responseStatus, terminator); else - hd ~= "HTTP/1.1 " ~ responseStatus; + buffer.add("HTTP/1.1 ", responseStatus, terminator); } else - hd ~= "Status: " ~ responseStatus; + buffer.add("Status: ", responseStatus, terminator); } else if (nph) { if(http10) - hd ~= "HTTP/1.0 200 OK"; + buffer.add("HTTP/1.0 200 OK", terminator); else - hd ~= "HTTP/1.1 200 OK"; + buffer.add("HTTP/1.1 200 OK", terminator); } if(websocketMode) goto websocket; if(nph) { // we're responsible for setting the date too according to http 1.1 - hd ~= "Date: " ~ printDate(cast(DateTime) Clock.currTime(UTC())); + char[29] db = void; + printDateToBuffer(cast(DateTime) Clock.currTime(UTC()), db[]); + buffer.add("Date: ", db[], terminator); } // FIXME: what if the user wants to set his own content-length? // The custom header function can do it, so maybe that's best. // Or we could reuse the isAll param. if(responseLocation !is null) { - hd ~= "Location: " ~ responseLocation; + buffer.add("Location: ", responseLocation, terminator); } if(!noCache && responseExpires != long.min) { // an explicit expiration date is set if(responseExpiresRelative) { - hd ~= "Cache-Control: "~(responseIsPublic ? "public" : "private")~", max-age="~to!string(responseExpires)~", no-cache=\"set-cookie, set-cookie2\""; + buffer.add("Cache-Control: ", responseIsPublic ? "public" : "private", ", max-age="); + buffer.add(responseExpires); + buffer.add(", no-cache=\"set-cookie, set-cookie2\"", terminator); } else { auto expires = SysTime(unixTimeToStdTime(cast(int)(responseExpires / 1000)), UTC()); - hd ~= "Expires: " ~ printDate( - cast(DateTime) expires); + char[29] db = void; + printDateToBuffer(cast(DateTime) expires, db[]); + buffer.add("Expires: ", db[], terminator); // FIXME: assuming everything is private unless you use nocache - generally right for dynamic pages, but not necessarily - hd ~= "Cache-Control: "~(responseIsPublic ? "public" : "private")~", no-cache=\"set-cookie, set-cookie2\""; + buffer.add("Cache-Control: ", (responseIsPublic ? "public" : "private"), ", no-cache=\"set-cookie, set-cookie2\""); + buffer.add(terminator); } } if(responseCookies !is null && responseCookies.length > 0) { foreach(c; responseCookies) - hd ~= "Set-Cookie: " ~ c; + buffer.add("Set-Cookie: ", c, terminator); } if(noCache) { // we specifically do not want caching (this is actually the default) - hd ~= "Cache-Control: private, no-cache=\"set-cookie\""; - hd ~= "Expires: 0"; - hd ~= "Pragma: no-cache"; + buffer.add("Cache-Control: private, no-cache=\"set-cookie\"", terminator); + buffer.add("Expires: 0", terminator); + buffer.add("Pragma: no-cache", terminator); } else { if(responseExpires == long.min) { // caching was enabled, but without a date set - that means assume cache forever - hd ~= "Cache-Control: public"; - hd ~= "Expires: Tue, 31 Dec 2030 14:00:00 GMT"; // FIXME: should not be more than one year in the future + buffer.add("Cache-Control: public", terminator); + buffer.add("Expires: Tue, 31 Dec 2030 14:00:00 GMT", terminator); // FIXME: should not be more than one year in the future } } if(responseContentType !is null) { - hd ~= "Content-Type: " ~ responseContentType; + buffer.add("Content-Type: ", responseContentType, terminator); } else - hd ~= "Content-Type: text/html; charset=utf-8"; + buffer.add("Content-Type: text/html; charset=utf-8", terminator); if(gzipResponse && acceptsGzip && isAll) { // FIXME: isAll really shouldn't be necessary - hd ~= "Content-Encoding: gzip"; + buffer.add("Content-Encoding: gzip", terminator); } if(!isAll) { if(nph && !http10) { - hd ~= "Transfer-Encoding: chunked"; + buffer.add("Transfer-Encoding: chunked", terminator); responseChunked = true; } } else { - hd ~= "Content-Length: " ~ to!string(t.length); + buffer.add("Content-Length: "); + buffer.add(t.length); + buffer.add(terminator); if(nph && keepAliveRequested) { - hd ~= "Connection: Keep-Alive"; + buffer.add("Connection: Keep-Alive", terminator); } } websocket: - if(customHeaders !is null) - hd ~= customHeaders; + + foreach(hd; customHeaders) + buffer.add(hd, terminator); // FIXME: what about duplicated headers? - foreach(h; hd) { - if(rawDataOutput !is null) - rawDataOutput(cast(const(ubyte)[]) (h ~ "\r\n")); - else { - version(CRuntime_Musl) { - stdout.rawWrite(h); - stdout.rawWrite("\n"); - } else { - writeln(h); - } - } - } - if(rawDataOutput !is null) - rawDataOutput(cast(const(ubyte)[]) ("\r\n")); - else { - version(CRuntime_Musl) { - stdout.rawWrite("\n"); - } else { - writeln(""); - } - } + // end of header indicator + buffer.add(terminator); outputtedResponseData = true; } @@ -2210,6 +2252,8 @@ class Cgi { void write(const(void)[] t, bool isAll = false, bool maybeAutoClose = true) { assert(!closed, "Output has already been closed"); + StackBuffer buffer = StackBuffer(0); + if(gzipResponse && acceptsGzip && isAll) { // FIXME: isAll really shouldn't be necessary // actually gzip the data here @@ -2224,11 +2268,11 @@ class Cgi { } if(!outputtedResponseData && (!autoBuffer || isAll)) { - flushHeaders(t, isAll); + prepHeaders(t, isAll, &buffer); } if(requestMethod != RequestMethod.HEAD && t.length > 0) { - if (autoBuffer) { + if (autoBuffer && !isAll) { outputBuffer ~= cast(ubyte[]) t; } if(!autoBuffer || isAll) { @@ -2237,20 +2281,22 @@ class Cgi { //rawDataOutput(makeChunk(cast(const(ubyte)[]) t)); // we're making the chunk here instead of in a function // to avoid unneeded gc pressure - rawDataOutput(cast(const(ubyte)[]) toHex(t.length)); - rawDataOutput(cast(const(ubyte)[]) "\r\n"); - rawDataOutput(cast(const(ubyte)[]) t); - rawDataOutput(cast(const(ubyte)[]) "\r\n"); - - + buffer.add(toHex(t.length)); + buffer.add("\r\n"); + buffer.add(cast(char[]) t, "\r\n"); } else { - rawDataOutput(cast(const(ubyte)[]) t); + buffer.add(cast(char[]) t); } else - stdout.rawWrite(t); + buffer.add(cast(char[]) t); } } + if(rawDataOutput !is null) + rawDataOutput(cast(const(ubyte)[]) buffer.get()); + else + stdout.rawWrite(buffer.get()); + if(maybeAutoClose && isAll) close(); // if you say it is all, that means we're definitely done // maybeAutoClose can be false though to avoid this (important if you call from inside close()! @@ -3187,7 +3233,8 @@ mixin template CustomCgiMain(CustomCgi, alias fun, long maxContentLength = defau version(embedded_httpd_processes) int processPoolSize = 8; -void cgiMainImpl(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)(string[] args) if(is(CustomCgi : Cgi)) { +// Returns true if run. You should exit the program after that. +bool tryAddonServers(string[] args) { if(args.length > 1) { // run the special separate processes if needed switch(args[1]) { @@ -3195,40 +3242,44 @@ void cgiMainImpl(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxC version(with_addon_servers) runWebsocketServer(); else - printf("Add-on servers not compiled in."); - return; + printf("Add-on servers not compiled in.\n"); + return true; case "--session-server": version(with_addon_servers) runSessionServer(); else - printf("Add-on servers not compiled in."); - return; + printf("Add-on servers not compiled in.\n"); + return true; case "--event-server": version(with_addon_servers) runEventServer(); else - printf("Add-on servers not compiled in."); - return; + printf("Add-on servers not compiled in.\n"); + return true; case "--timer-server": version(with_addon_servers) runTimerServer(); else - printf("Add-on servers not compiled in."); - return; + printf("Add-on servers not compiled in.\n"); + return true; case "--timed-jobs": import core.demangle; version(with_addon_servers_connections) foreach(k, v; scheduledJobHandlers) writeln(k, "\t", demangle(k)); - return; + return true; case "--timed-job": scheduledJobHandlers[args[2]](args[3 .. $]); - return; + return true; default: // intentionally blank - do nothing and carry on to run normally } } + return false; +} +/// Tries to simulate a request from the command line. Returns true if it does, false if it didn't find the args. +bool trySimulatedRequest(alias fun, CustomCgi = Cgi)(string[] args) if(is(CustomCgi : Cgi)) { // we support command line thing for easy testing everywhere // it needs to be called ./app method uri [other args...] if(args.length >= 3 && isCgiRequestMethod(args[1])) { @@ -3236,427 +3287,658 @@ void cgiMainImpl(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxC scope(exit) cgi.dispose(); fun(cgi); cgi.close(); - return; + return true; } + return false; +} +/++ + A server control and configuration struct, as a potential alternative to calling [GenericMain] or [cgiMainImpl]. See the source of [cgiMainImpl] to an example of how you can use it. - ushort listeningPort(ushort def) { - bool found = false; - foreach(arg; args) { - if(found) - return to!ushort(arg); - if(arg == "--port" || arg == "-p" || arg == "/port" || arg == "--listening-port") - found = true; - } - return def; + History: + Added Sept 26, 2020 (release version 8.5). ++/ +struct RequestServer { + /// + string listeningHost; + /// + ushort listeningPort = defaultListeningPort(); + + /// + this(string defaultHost, ushort defaultPort) { + this.listeningHost = defaultHost; + this.listeningPort = defaultPort; + } + + /// + this(ushort defaultPort) { + listeningPort = defaultPort; } - string listeningHost() { - bool found = false; + /// Reads the args into the other values. + void configureFromCommandLine(string[] args) { + bool foundPort = false; + bool foundHost = false; foreach(arg; args) { - if(found) - return arg; + if(foundPort) { + listeningPort = to!ushort(arg); + foundPort = false; + } + if(foundHost) { + listeningHost = arg; + foundHost = false; + } if(arg == "--listening-host" || arg == "-h" || arg == "/listening-host") - found = true; + foundHost = true; + else if(arg == "--port" || arg == "-p" || arg == "/port" || arg == "--listening-port") + foundPort = true; } - return ""; } - version(netman_httpd) { - import arsd.httpd; - // what about forwarding the other constructor args? - // this probably needs a whole redoing... - serveHttp!CustomCgi(&fun, listeningPort(8080));//5005); - return; - } else - version(embedded_httpd_processes) { - import core.sys.posix.unistd; - import core.sys.posix.sys.socket; - import core.sys.posix.netinet.in_; - //import std.c.linux.socket; - int sock = socket(AF_INET, SOCK_STREAM, 0); - if(sock == -1) - throw new Exception("socket"); + /++ + Serves a single request on this thread, with an embedded server, then stops. Designed for cases like embedded oauth responders - { - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(listeningPort(8085)); - auto lh = listeningHost(); - if(lh.length) { - if(inet_pton(AF_INET, lh.toStringz(), &addr.sin_addr.s_addr) != 1) - throw new Exception("bad listening host given, please use an IP address.\nExample: --listening-host 127.0.0.1 means listen only on Localhost.\nExample: --listening-host 0.0.0.0 means listen on all interfaces.\nOr you can pass any other single numeric IPv4 address."); - } else - addr.sin_addr.s_addr = INADDR_ANY; + History: + Added Oct 10, 2020. + Example: - // HACKISH - int on = 1; - setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, on.sizeof); - // end hack + --- + import arsd.cgi; + void main() { + RequestServer server = RequestServer("127.0.0.1", 6789); + string oauthCode; + string oauthScope; + server.serveOnce!((cgi) { + oauthCode = cgi.request("code"); + oauthScope = cgi.request("scope"); + cgi.write("Thank you, please return to the application."); + }); + // use the code and scope given + } + --- + +/ + void serveOnce(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)() { + import std.socket; - - if(bind(sock, cast(sockaddr*) &addr, addr.sizeof) == -1) { - close(sock); - throw new Exception("bind"); - } + bool tcp; + void delegate() cleanup; + auto socket = startListening(listeningHost, listeningPort, tcp, cleanup, 1); + auto connection = socket.accept(); + doThreadHttpConnectionGuts!(CustomCgi, fun, true)(connection); - // FIXME: if this queue is full, it will just ignore it - // and wait for the client to retransmit it. This is an - // obnoxious timeout condition there. - if(sock.listen(128) == -1) { - close(sock); - throw new Exception("listen"); - } + if(cleanup) + cleanup(); + } + + /++ + Starts serving requests according to the current configuration. + +/ + void serve(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)() { + version(netman_httpd) { + // Obsolete! + + import arsd.httpd; + // what about forwarding the other constructor args? + // this probably needs a whole redoing... + serveHttp!CustomCgi(&fun, listeningPort);//5005); + return; + } else + version(embedded_httpd_processes) { + serveEmbeddedHttpdProcesses!(fun, CustomCgi)(this); + } else + version(embedded_httpd_threads) { + auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadHttpConnection!(CustomCgi, fun)); + manager.listen(); + } else + version(scgi) { + auto manager = new ListeningConnectionManager(listeningHost, listeningPort, &doThreadScgiConnection!(CustomCgi, fun, maxContentLength)); + manager.listen(); + } else + version(fastcgi) { + serveFastCgi!(fun, CustomCgi, maxContentLength)(this); + } else { + //version=plain_cgi; + handleCgiRequest!(fun, CustomCgi, maxContentLength)(); } + } - version(embedded_httpd_processes_accept_after_fork) {} else { - int pipeReadFd; - int pipeWriteFd; + void stop() { - { - int[2] pipeFd; - if(socketpair(AF_UNIX, SOCK_DGRAM, 0, pipeFd)) { - import core.stdc.errno; - throw new Exception("pipe failed " ~ to!string(errno)); - } + } +} - pipeReadFd = pipeFd[0]; - pipeWriteFd = pipeFd[1]; - } +version(embedded_httpd_processes) +void serveEmbeddedHttpdProcesses(alias fun, CustomCgi = Cgi)(RequestServer params) { + import core.sys.posix.unistd; + import core.sys.posix.sys.socket; + import core.sys.posix.netinet.in_; + //import std.c.linux.socket; + + int sock = socket(AF_INET, SOCK_STREAM, 0); + if(sock == -1) + throw new Exception("socket"); + + cloexec(sock); + + { + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(params.listeningPort); + auto lh = params.listeningHost; + if(lh.length) { + if(inet_pton(AF_INET, lh.toStringz(), &addr.sin_addr.s_addr) != 1) + throw new Exception("bad listening host given, please use an IP address.\nExample: --listening-host 127.0.0.1 means listen only on Localhost.\nExample: --listening-host 0.0.0.0 means listen on all interfaces.\nOr you can pass any other single numeric IPv4 address."); + } else + addr.sin_addr.s_addr = INADDR_ANY; + + // HACKISH + int on = 1; + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, on.sizeof); + // end hack + + + if(bind(sock, cast(sockaddr*) &addr, addr.sizeof) == -1) { + close(sock); + throw new Exception("bind"); } + // FIXME: if this queue is full, it will just ignore it + // and wait for the client to retransmit it. This is an + // obnoxious timeout condition there. + if(sock.listen(128) == -1) { + close(sock); + throw new Exception("listen"); + } + } - int processCount; - pid_t newPid; - reopen: - while(processCount < processPoolSize) { - newPid = fork(); - if(newPid == 0) { - // start serving on the socket - //ubyte[4096] backingBuffer; - for(;;) { - bool closeConnection; - uint i; - sockaddr addr; - i = addr.sizeof; - version(embedded_httpd_processes_accept_after_fork) { - int s = accept(sock, &addr, &i); - int opt = 1; - import core.sys.posix.netinet.tcp; - // the Cgi class does internal buffering, so disabling this - // helps with latency in many cases... - setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &opt, opt.sizeof); - } else { - int s; - auto readret = read_fd(pipeReadFd, &s, s.sizeof, &s); - if(readret != s.sizeof) { - import core.stdc.errno; - throw new Exception("pipe read failed " ~ to!string(errno)); - } + version(embedded_httpd_processes_accept_after_fork) {} else { + int pipeReadFd; + int pipeWriteFd; - //writeln("process ", getpid(), " got socket ", s); + { + int[2] pipeFd; + if(socketpair(AF_UNIX, SOCK_DGRAM, 0, pipeFd)) { + import core.stdc.errno; + throw new Exception("pipe failed " ~ to!string(errno)); + } + + pipeReadFd = pipeFd[0]; + pipeWriteFd = pipeFd[1]; + } + } + + + int processCount; + pid_t newPid; + reopen: + while(processCount < processPoolSize) { + newPid = fork(); + if(newPid == 0) { + // start serving on the socket + //ubyte[4096] backingBuffer; + for(;;) { + bool closeConnection; + uint i; + sockaddr addr; + i = addr.sizeof; + version(embedded_httpd_processes_accept_after_fork) { + int s = accept(sock, &addr, &i); + int opt = 1; + import core.sys.posix.netinet.tcp; + // the Cgi class does internal buffering, so disabling this + // helps with latency in many cases... + setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &opt, opt.sizeof); + cloexec(s); + } else { + int s; + auto readret = read_fd(pipeReadFd, &s, s.sizeof, &s); + if(readret != s.sizeof) { + import core.stdc.errno; + throw new Exception("pipe read failed " ~ to!string(errno)); } - try { + //writeln("process ", getpid(), " got socket ", s); + } - if(s == -1) - throw new Exception("accept"); + try { - scope(failure) close(s); - //ubyte[__traits(classInstanceSize, BufferedInputRange)] bufferedRangeContainer; - auto ir = new BufferedInputRange(s); - //auto ir = emplace!BufferedInputRange(bufferedRangeContainer, s, backingBuffer); + if(s == -1) + throw new Exception("accept"); - while(!ir.empty) { - ubyte[__traits(classInstanceSize, CustomCgi)] cgiContainer; + scope(failure) close(s); + //ubyte[__traits(classInstanceSize, BufferedInputRange)] bufferedRangeContainer; + auto ir = new BufferedInputRange(s); + //auto ir = emplace!BufferedInputRange(bufferedRangeContainer, s, backingBuffer); - Cgi cgi; - try { - cgi = new CustomCgi(ir, &closeConnection); - cgi._outputFileHandle = s; - // if we have a single process and the browser tries to leave the connection open while concurrently requesting another, it will block everything an deadlock since there's no other server to accept it. By closing after each request in this situation, it tells the browser to serialize for us. - if(processPoolSize <= 1) - closeConnection = true; - //cgi = emplace!CustomCgi(cgiContainer, ir, &closeConnection); - } catch(Throwable t) { - // a construction error is either bad code or bad request; bad request is what it should be since this is bug free :P - // anyway let's kill the connection - version(CRuntime_Musl) { - // LockingTextWriter fails here - // so working around it - auto estr = t.toString(); - stderr.rawWrite(estr); - stderr.rawWrite("\n"); - } else - stderr.writeln(t.toString()); - sendAll(ir.source, plainHttpError(false, "400 Bad Request", t)); - closeConnection = true; - break; - } - assert(cgi !is null); - scope(exit) - cgi.dispose(); + while(!ir.empty) { + //ubyte[__traits(classInstanceSize, CustomCgi)] cgiContainer; - try { - fun(cgi); - cgi.close(); - if(cgi.websocketMode) - closeConnection = true; - } catch(ConnectionException ce) { + Cgi cgi; + try { + cgi = new CustomCgi(ir, &closeConnection); + cgi._outputFileHandle = s; + // if we have a single process and the browser tries to leave the connection open while concurrently requesting another, it will block everything an deadlock since there's no other server to accept it. By closing after each request in this situation, it tells the browser to serialize for us. + if(processPoolSize <= 1) closeConnection = true; - } catch(Throwable t) { - // a processing error can be recovered from - version(CRuntime_Musl) { - // LockingTextWriter fails here - // so working around it - auto estr = t.toString(); - stderr.rawWrite(estr); - } else { - stderr.writeln(t.toString); - } - if(!handleException(cgi, t)) - closeConnection = true; + //cgi = emplace!CustomCgi(cgiContainer, ir, &closeConnection); + } catch(Throwable t) { + // a construction error is either bad code or bad request; bad request is what it should be since this is bug free :P + // anyway let's kill the connection + version(CRuntime_Musl) { + // LockingTextWriter fails here + // so working around it + auto estr = t.toString(); + stderr.rawWrite(estr); + stderr.rawWrite("\n"); + } else + stderr.writeln(t.toString()); + sendAll(ir.source, plainHttpError(false, "400 Bad Request", t)); + closeConnection = true; + break; + } + assert(cgi !is null); + scope(exit) + cgi.dispose(); + + try { + fun(cgi); + cgi.close(); + if(cgi.websocketMode) + closeConnection = true; + } catch(ConnectionException ce) { + closeConnection = true; + } catch(Throwable t) { + // a processing error can be recovered from + version(CRuntime_Musl) { + // LockingTextWriter fails here + // so working around it + auto estr = t.toString(); + stderr.rawWrite(estr); + } else { + stderr.writeln(t.toString); } + if(!handleException(cgi, t)) + closeConnection = true; + } - if(closeConnection) { + if(closeConnection) { + ir.source.close(); + break; + } else { + if(!ir.empty) + ir.popFront(); // get the next + else if(ir.sourceClosed) { ir.source.close(); - break; - } else { - if(!ir.empty) - ir.popFront(); // get the next - else if(ir.sourceClosed) { - ir.source.close(); - } } } + } - ir.source.close(); - } catch(Throwable t) { - version(CRuntime_Musl) {} else + ir.source.close(); + } catch(Throwable t) { + version(CRuntime_Musl) {} else debug writeln(t); - // most likely cause is a timeout - } + // most likely cause is a timeout } - } else { - processCount++; } + } else { + processCount++; } + } - // the parent should wait for its children... - if(newPid) { - import core.sys.posix.sys.wait; - - version(embedded_httpd_processes_accept_after_fork) {} else { - import core.sys.posix.sys.select; - int[] fdQueue; - while(true) { - // writeln("select call"); - int nfds = pipeWriteFd; - if(sock > pipeWriteFd) - nfds = sock; - nfds += 1; - fd_set read_fds; - fd_set write_fds; - FD_ZERO(&read_fds); - FD_ZERO(&write_fds); - FD_SET(sock, &read_fds); - if(fdQueue.length) - FD_SET(pipeWriteFd, &write_fds); - auto ret = select(nfds, &read_fds, &write_fds, null, null); - if(ret == -1) { - import core.stdc.errno; - if(errno == EINTR) - goto try_wait; - else - throw new Exception("wtf select"); - } + // the parent should wait for its children... + if(newPid) { + import core.sys.posix.sys.wait; - int s = -1; - if(FD_ISSET(sock, &read_fds)) { - uint i; - sockaddr addr; - i = addr.sizeof; - s = accept(sock, &addr, &i); - import core.sys.posix.netinet.tcp; - int opt = 1; - setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &opt, opt.sizeof); - } + version(embedded_httpd_processes_accept_after_fork) {} else { + import core.sys.posix.sys.select; + int[] fdQueue; + while(true) { + // writeln("select call"); + int nfds = pipeWriteFd; + if(sock > pipeWriteFd) + nfds = sock; + nfds += 1; + fd_set read_fds; + fd_set write_fds; + FD_ZERO(&read_fds); + FD_ZERO(&write_fds); + FD_SET(sock, &read_fds); + if(fdQueue.length) + FD_SET(pipeWriteFd, &write_fds); + auto ret = select(nfds, &read_fds, &write_fds, null, null); + if(ret == -1) { + import core.stdc.errno; + if(errno == EINTR) + goto try_wait; + else + throw new Exception("wtf select"); + } - if(FD_ISSET(pipeWriteFd, &write_fds)) { - if(s == -1 && fdQueue.length) { - s = fdQueue[0]; - fdQueue = fdQueue[1 .. $]; // FIXME reuse buffer - } - write_fd(pipeWriteFd, &s, s.sizeof, s); - close(s); // we are done with it, let the other process take ownership - } else - fdQueue ~= s; + int s = -1; + if(FD_ISSET(sock, &read_fds)) { + uint i; + sockaddr addr; + i = addr.sizeof; + s = accept(sock, &addr, &i); + cloexec(s); + import core.sys.posix.netinet.tcp; + int opt = 1; + setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &opt, opt.sizeof); } + + if(FD_ISSET(pipeWriteFd, &write_fds)) { + if(s == -1 && fdQueue.length) { + s = fdQueue[0]; + fdQueue = fdQueue[1 .. $]; // FIXME reuse buffer + } + write_fd(pipeWriteFd, &s, s.sizeof, s); + close(s); // we are done with it, let the other process take ownership + } else + fdQueue ~= s; } + } - try_wait: + try_wait: - int status; - while(-1 != wait(&status)) { - version(CRuntime_Musl) {} else { + int status; + while(-1 != wait(&status)) { + version(CRuntime_Musl) {} else { import std.stdio; writeln("Process died ", status); - } - processCount--; - goto reopen; } - close(sock); + processCount--; + goto reopen; } - } else - version(embedded_httpd_threads) { - auto manager = new ListeningConnectionManager(listeningHost(), listeningPort(8085), &doThreadHttpConnection!(CustomCgi, fun)); - manager.listen(); - } else - version(scgi) { - import std.exception; - import al = std.algorithm; - auto manager = new ListeningConnectionManager(listeningHost(), listeningPort(4000), &doThreadScgiConnection!(CustomCgi, fun, maxContentLength)); - manager.listen(); - } else - version(fastcgi) { - // SetHandler fcgid-script - FCGX_Stream* input, output, error; - FCGX_ParamArray env; + close(sock); + } +} +version(fastcgi) +void serveFastCgi(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)(RequestServer params) { + // SetHandler fcgid-script + FCGX_Stream* input, output, error; + FCGX_ParamArray env; - const(ubyte)[] getFcgiChunk() { - const(ubyte)[] ret; - while(FCGX_HasSeenEOF(input) != -1) - ret ~= cast(ubyte) FCGX_GetChar(input); - return ret; - } - void writeFcgi(const(ubyte)[] data) { - FCGX_PutStr(data.ptr, data.length, output); - } - - void doARequest() { - string[string] fcgienv; + const(ubyte)[] getFcgiChunk() { + const(ubyte)[] ret; + while(FCGX_HasSeenEOF(input) != -1) + ret ~= cast(ubyte) FCGX_GetChar(input); + return ret; + } - for(auto e = env; e !is null && *e !is null; e++) { - string cur = to!string(*e); - auto idx = cur.indexOf("="); - string name, value; - if(idx == -1) - name = cur; - else { - name = cur[0 .. idx]; - value = cur[idx + 1 .. $]; - } + void writeFcgi(const(ubyte)[] data) { + FCGX_PutStr(data.ptr, data.length, output); + } - fcgienv[name] = value; - } + void doARequest() { + string[string] fcgienv; - void flushFcgi() { - FCGX_FFlush(output); + for(auto e = env; e !is null && *e !is null; e++) { + string cur = to!string(*e); + auto idx = cur.indexOf("="); + string name, value; + if(idx == -1) + name = cur; + else { + name = cur[0 .. idx]; + value = cur[idx + 1 .. $]; } - Cgi cgi; - try { - cgi = new CustomCgi(maxContentLength, fcgienv, &getFcgiChunk, &writeFcgi, &flushFcgi); - } catch(Throwable t) { - FCGX_PutStr(cast(ubyte*) t.msg.ptr, t.msg.length, error); - writeFcgi(cast(const(ubyte)[]) plainHttpError(true, "400 Bad Request", t)); - return; //continue; - } - assert(cgi !is null); - scope(exit) cgi.dispose(); - try { - fun(cgi); - cgi.close(); - } catch(Throwable t) { - // log it to the error stream - FCGX_PutStr(cast(ubyte*) t.msg.ptr, t.msg.length, error); - // handle it for the user, if we can - if(!handleException(cgi, t)) - return; // continue; - } + fcgienv[name] = value; } - auto lp = listeningPort(0); - FCGX_Request request; - if(lp) { - // if a listening port was specified on the command line, we want to spawn ourself - // (needed for nginx without spawn-fcgi, e.g. on Windows) - FCGX_Init(); - auto sock = FCGX_OpenSocket(toStringz(listeningHost() ~ ":" ~ to!string(lp)), 12); - if(sock < 0) - throw new Exception("Couldn't listen on the port"); - FCGX_InitRequest(&request, sock, 0); - while(FCGX_Accept_r(&request) >= 0) { - input = request.inStream; - output = request.outStream; - error = request.errStream; - env = request.envp; - doARequest(); - } - } else { - // otherwise, assume the httpd is doing it (the case for Apache, IIS, and Lighttpd) - // using the version with a global variable since we are separate processes anyway - while(FCGX_Accept(&input, &output, &error, &env) >= 0) { - doARequest(); - } + void flushFcgi() { + FCGX_FFlush(output); } - } else { - // standard CGI is the default version + Cgi cgi; try { - cgi = new CustomCgi(maxContentLength); - version(Posix) - cgi._outputFileHandle = 1; // stdout - else version(Windows) - cgi._outputFileHandle = GetStdHandle(STD_OUTPUT_HANDLE); - else static assert(0); + cgi = new CustomCgi(maxContentLength, fcgienv, &getFcgiChunk, &writeFcgi, &flushFcgi); } catch(Throwable t) { - version(CRuntime_Musl) { - // LockingTextWriter fails here - // so working around it - auto s = t.toString(); - stderr.rawWrite(s); - stdout.rawWrite(plainHttpError(true, "400 Bad Request", t)); - } else { - stderr.writeln(t.msg); - // the real http server will probably handle this; - // most likely, this is a bug in Cgi. But, oh well. - stdout.write(plainHttpError(true, "400 Bad Request", t)); - } - return; + FCGX_PutStr(cast(ubyte*) t.msg.ptr, t.msg.length, error); + writeFcgi(cast(const(ubyte)[]) plainHttpError(true, "400 Bad Request", t)); + return; //continue; } assert(cgi !is null); scope(exit) cgi.dispose(); - try { fun(cgi); cgi.close(); - } catch (Throwable t) { - version(CRuntime_Musl) { - // LockingTextWriter fails here - // so working around it - auto s = t.msg; - stderr.rawWrite(s); - } else { - stderr.writeln(t.msg); - } + } catch(Throwable t) { + // log it to the error stream + FCGX_PutStr(cast(ubyte*) t.msg.ptr, t.msg.length, error); + // handle it for the user, if we can if(!handleException(cgi, t)) - return; + return; // continue; + } + } + + auto lp = params.listeningPort; + + FCGX_Request request; + if(lp) { + // if a listening port was specified on the command line, we want to spawn ourself + // (needed for nginx without spawn-fcgi, e.g. on Windows) + FCGX_Init(); + auto sock = FCGX_OpenSocket(toStringz(params.listeningHost ~ ":" ~ to!string(lp)), 12); + if(sock < 0) + throw new Exception("Couldn't listen on the port"); + FCGX_InitRequest(&request, sock, 0); + while(FCGX_Accept_r(&request) >= 0) { + input = request.inStream; + output = request.outStream; + error = request.errStream; + env = request.envp; + doARequest(); + } + } else { + // otherwise, assume the httpd is doing it (the case for Apache, IIS, and Lighttpd) + // using the version with a global variable since we are separate processes anyway + while(FCGX_Accept(&input, &output, &error, &env) >= 0) { + doARequest(); + } + } +} + +/// Returns the default listening port for the current cgi configuration. 8085 for embedded httpd, 4000 for scgi, irrelevant for others. +ushort defaultListeningPort() { + version(netman_httpd) + return 8080; + else version(embedded_httpd_processes) + return 8085; + else version(embedded_httpd_threads) + return 8085; + else version(scgi) + return 4000; + else + return 0; +} + +/++ + This is the function [GenericMain] calls. View its source for some simple boilerplate you can copy/paste and modify, or you can call it yourself from your `main`. + + Params: + fun = Your request handler + CustomCgi = a subclass of Cgi, if you wise to customize it further + maxContentLength = max POST size you want to allow + args = command-line arguments + + History: + Documented Sept 26, 2020. ++/ +void cgiMainImpl(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)(string[] args) if(is(CustomCgi : Cgi)) { + if(tryAddonServers(args)) + return; + + if(trySimulatedRequest!(fun, CustomCgi)(args)) + return; + + RequestServer server; + // you can change the port here if you like + // server.listeningPort = 9000; + + // then call this to let the command line args override your default + server.configureFromCommandLine(args); + + // and serve the request(s). + server.serve!(fun, CustomCgi, maxContentLength)(); +} + +//version(plain_cgi) +void handleCgiRequest(alias fun, CustomCgi = Cgi, long maxContentLength = defaultMaxContentLength)() { + // standard CGI is the default version + Cgi cgi; + try { + cgi = new CustomCgi(maxContentLength); + version(Posix) + cgi._outputFileHandle = 1; // stdout + else version(Windows) + cgi._outputFileHandle = GetStdHandle(STD_OUTPUT_HANDLE); + else static assert(0); + } catch(Throwable t) { + version(CRuntime_Musl) { + // LockingTextWriter fails here + // so working around it + auto s = t.toString(); + stderr.rawWrite(s); + stdout.rawWrite(plainHttpError(true, "400 Bad Request", t)); + } else { + stderr.writeln(t.msg); + // the real http server will probably handle this; + // most likely, this is a bug in Cgi. But, oh well. + stdout.write(plainHttpError(true, "400 Bad Request", t)); + } + return; + } + assert(cgi !is null); + scope(exit) cgi.dispose(); + + try { + fun(cgi); + cgi.close(); + } catch (Throwable t) { + version(CRuntime_Musl) { + // LockingTextWriter fails here + // so working around it + auto s = t.msg; + stderr.rawWrite(s); + } else { + stderr.writeln(t.msg); + } + if(!handleException(cgi, t)) + return; + } +} + +/+ + The event loop for embedded_httpd_threads will prolly fiber dispatch + cgi constructors too, so slow posts will not monopolize a worker thread. + + May want to provide the worker task system just need to ensure all the fibers + has a big enough stack for real work... would also ideally like to reuse them. + + + So prolly bir would switch it to nonblocking. If it would block, it epoll + registers one shot with this existing fiber to take it over. + + new connection comes in. it picks a fiber off the free list, + or if there is none, it creates a new one. this fiber handles + this connection the whole time. + + epoll triggers the fiber when something comes in. it is called by + a random worker thread, it might change at any time. at least during + the constructor. maybe into the main body it will stay tied to a thread + just so TLS stuff doesn't randomly change in the middle. but I could + specify if you yield all bets are off. + + when the request is finished, if there's more data buffered, it just + keeps going. if there is no more data buffered, it epoll ctls to + get triggered when more data comes in. all one shot. + + when a connection is closed, the fiber returns and is then reset + and added to the free list. if the free list is full, the fiber is + just freed, this means it will balloon to a certain size but not generally + grow beyond that unless the activity keeps going. + + 256 KB stack i thnk per fiber. 4,000 active fibers per gigabyte of memory. + + So the fiber has its own magic methods to read and write. if they would block, it registers + for epoll and yields. when it returns, it read/writes and then returns back normal control. + + basically you issue the command and it tells you when it is done + + it needs to DEL the epoll thing when it is closed. add it when opened. mod it when anther thing issued + ++/ + +version(cgi_use_fiber) +class CgiFiber : Fiber { + this(void function(Socket) handler) { + this.handler = handler; + // FIXME: stack size + super(&run); + } + + Socket connection; + void function(Socket) handler; + + void run() { + handler(connection); + } + + void delegate() postYield; + + private void setPostYield(scope void delegate() py) @nogc { + postYield = cast(void delegate()) py; + } + + void proceed() { + call(); + auto py = postYield; + postYield = null; + if(py !is null) + py(); + if(state == State.TERM) { + import core.memory; + GC.removeRoot(cast(void*) this); } } } -version(embedded_httpd_threads) void doThreadHttpConnection(CustomCgi, alias fun)(Socket connection) { + assert(connection !is null); + version(cgi_use_fiber) { + auto fiber = new CgiFiber(&doThreadHttpConnectionGuts!(CustomCgi, fun)); + import core.memory; + GC.addRoot(cast(void*) fiber); + fiber.connection = connection; + fiber.proceed(); + } else { + doThreadHttpConnectionGuts!(CustomCgi, fun)(connection); + } +} + +void doThreadHttpConnectionGuts(CustomCgi, alias fun, bool alwaysCloseConnection = false)(Socket connection) { scope(failure) { // catch all for other errors sendAll(connection, plainHttpError(false, "500 Internal Server Error", null)); connection.close(); } - bool closeConnection; + bool closeConnection = alwaysCloseConnection; + + /+ + ubyte[4096] inputBuffer = void; + ubyte[__traits(classInstanceSize, BufferedInputRange)] birBuffer = void; + ubyte[__traits(classInstanceSize, CustomCgi)] cgiBuffer = void; + + birBuffer[] = cast(ubyte[]) typeid(BufferedInputRange).initializer()[]; + BufferedInputRange ir = cast(BufferedInputRange) cast(void*) birBuffer.ptr; + ir.__ctor(connection, inputBuffer[], true); + +/ + auto ir = new BufferedInputRange(connection); while(!ir.empty) { @@ -3665,6 +3947,7 @@ void doThreadHttpConnection(CustomCgi, alias fun)(Socket connection) { ir.popFront(); if(ir.sourceClosed) { connection.close(); + closeConnection = true; break; } } @@ -3710,14 +3993,18 @@ void doThreadHttpConnection(CustomCgi, alias fun)(Socket connection) { closeConnection = true; } - if(closeConnection) { + if(closeConnection || alwaysCloseConnection) { connection.close(); + ir.dispose(); + closeConnection = false; // don't reclose after loop break; } else { if(ir.front.length) { ir.popFront(); // we can't just discard the buffer, so get the next bit and keep chugging along } else if(ir.sourceClosed) { ir.source.close(); + ir.dispose(); + closeConnection = false; } else { continue; // break; // this was for a keepalive experiment @@ -3725,8 +4012,10 @@ void doThreadHttpConnection(CustomCgi, alias fun)(Socket connection) { } } - if(closeConnection) + if(closeConnection) { connection.close(); + ir.dispose(); + } // I am otherwise NOT closing it here because the parent thread might still be able to make use of the keep-alive connection! } @@ -3831,15 +4120,47 @@ void doThreadScgiConnection(CustomCgi, alias fun, long maxContentLength)(Socket } string printDate(DateTime date) { - return format( - "%.3s, %02d %.3s %d %02d:%02d:%02d GMT", // could be UTC too - to!string(date.dayOfWeek).capitalize, - date.day, - to!string(date.month).capitalize, - date.year, - date.hour, - date.minute, - date.second); + char[29] buffer = void; + printDateToBuffer(date, buffer[]); + return buffer.idup; +} + +int printDateToBuffer(DateTime date, char[] buffer) @nogc { + assert(buffer.length >= 29); + // 29 static length ? + + static immutable daysOfWeek = [ + "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" + ]; + + static immutable months = [ + null, "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" + ]; + + buffer[0 .. 3] = daysOfWeek[date.dayOfWeek]; + buffer[3 .. 5] = ", "; + buffer[5] = date.day / 10 + '0'; + buffer[6] = date.day % 10 + '0'; + buffer[7] = ' '; + buffer[8 .. 11] = months[date.month]; + buffer[11] = ' '; + auto y = date.year; + buffer[12] = cast(char) (y / 1000 + '0'); y %= 1000; + buffer[13] = cast(char) (y / 100 + '0'); y %= 100; + buffer[14] = cast(char) (y / 10 + '0'); y %= 10; + buffer[15] = cast(char) (y + '0'); + buffer[16] = ' '; + buffer[17] = date.hour / 10 + '0'; + buffer[18] = date.hour % 10 + '0'; + buffer[19] = ':'; + buffer[20] = date.minute / 10 + '0'; + buffer[21] = date.minute % 10 + '0'; + buffer[22] = ':'; + buffer[23] = date.second / 10 + '0'; + buffer[24] = date.second % 10 + '0'; + buffer[25 .. $] = " GMT"; + + return 29; } @@ -3930,6 +4251,57 @@ version(fastcgi) { import std.socket; +version(cgi_use_fiber) { + import core.thread; + import core.sys.linux.epoll; + + __gshared int epfd; +} + + +version(cgi_use_fiber) +private enum WakeupEvent { + Read = EPOLLIN, + Write = EPOLLOUT +} + +version(cgi_use_fiber) +private void registerEventWakeup(bool* registered, Socket source, WakeupEvent e) @nogc { + + // static cast since I know what i have in here and don't want to pay for dynamic cast + auto f = cast(CgiFiber) cast(void*) Fiber.getThis(); + + f.setPostYield = () { + if(*registered) { + // rearm + epoll_event evt; + evt.events = e | EPOLLONESHOT; + evt.data.ptr = cast(void*) f; + if(epoll_ctl(epfd, EPOLL_CTL_MOD, source.handle, &evt) == -1) + throw new Exception("epoll_ctl"); + } else { + // initial registration + *registered = true ; + int fd = source.handle; + epoll_event evt; + evt.events = e | EPOLLONESHOT; + evt.data.ptr = cast(void*) f; + if(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &evt) == -1) + throw new Exception("epoll_ctl"); + } + }; + + Fiber.yield(); + + f.setPostYield(null); +} + +version(cgi_use_fiber) +void unregisterSource(Socket s) { + epoll_event evt; + epoll_ctl(epfd, EPOLL_CTL_DEL, s.handle(), &evt); +} + // it is a class primarily for reference semantics // I might change this interface /// This is NOT ACTUALLY an input range! It is too different. Historical mistake kinda. @@ -3939,16 +4311,21 @@ class BufferedInputRange { this(new Socket(cast(socket_t) source, AddressFamily.INET), buffer); } - this(Socket source, ubyte[] buffer = null) { + this(Socket source, ubyte[] buffer = null, bool allowGrowth = true) { // if they connect but never send stuff to us, we don't want it wasting the process // so setting a time out - source.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"seconds"(3)); + version(cgi_use_fiber) + source.blocking = false; + else + source.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"seconds"(3)); + this.source = source; if(buffer is null) { underlyingBuffer = new ubyte[4096]; - allowGrowth = true; + this.allowGrowth = true; } else { underlyingBuffer = buffer; + this.allowGrowth = allowGrowth; } assert(underlyingBuffer.length); @@ -3959,6 +4336,17 @@ class BufferedInputRange { popFront(); // prime } + version(cgi_use_fiber) { + bool registered; + } + + void dispose() { + version(cgi_use_fiber) { + if(registered) + unregisterSource(source); + } + } + /** A slight difference from regular ranges is you can give it the maximum number of bytes to consume. @@ -4000,15 +4388,24 @@ class BufferedInputRange { auto ret = source.receive(freeSpace); if(ret == Socket.ERROR) { if(wouldHaveBlocked()) { - // gonna treat a timeout here as a close - sourceClosed = true; - return; + version(cgi_use_fiber) { + registerEventWakeup(®istered, source, WakeupEvent.Read); + goto try_again; + } else { + // gonna treat a timeout here as a close + sourceClosed = true; + return; + } } version(Posix) { import core.stdc.errno; if(errno == EINTR || errno == EAGAIN) { goto try_again; } + if(errno == ECONNRESET) { + sourceClosed = true; + return; + } } throw new Exception(lastSocketError); // FIXME } @@ -4019,6 +4416,7 @@ class BufferedInputRange { //import std.stdio; writeln(view.ptr); writeln(underlyingBuffer.ptr); writeln(view.length, " ", ret, " = ", view.length + ret); view = underlyingBuffer[view.ptr - underlyingBuffer.ptr .. view.length + ret]; + //import std.stdio; writeln(cast(string) view); } while(view.length < minBytesToSettleFor); } @@ -4091,6 +4489,11 @@ class ListeningConnectionManager { running = true; shared(int) loopBroken; + version(Posix) { + import core.sys.posix.signal; + signal(SIGPIPE, SIG_IGN); + } + version(cgi_no_threads) { // NEVER USE THIS // it exists only for debugging and other special occasions @@ -4099,6 +4502,7 @@ class ListeningConnectionManager { // thing when a request is slow while(!loopBroken && running) { auto sn = listener.accept(); + cloexec(sn); try { handler(sn); } catch(Exception e) { @@ -4107,40 +4511,82 @@ class ListeningConnectionManager { } } } else { - semaphore = new Semaphore(); + import std.parallelism; - ConnectionThread[16] threads; - foreach(i, ref thread; threads) { - thread = new ConnectionThread(this, handler, cast(int) i); - thread.start(); + version(cgi_use_fork) { + //asm { int 3; } + fork(); } - /+ - version(linux) { + version(cgi_use_fiber) { import core.sys.linux.epoll; - epoll_fd = epoll_create1(EPOLL_CLOEXEC); - if(epoll_fd == -1) + epfd = epoll_create1(EPOLL_CLOEXEC); + if(epfd == -1) throw new Exception("epoll_create1 " ~ to!string(errno)); scope(exit) { import core.sys.posix.unistd; - close(epoll_fd); + close(epfd); } - epoll_event[64] events; - epoll_event ev; - ev.events = EPOLLIN; + ev.events = EPOLLIN | EPOLLEXCLUSIVE; // EPOLLEXCLUSIVE is only available on kernels since like 2017 but that's prolly good enough. ev.data.fd = listener.handle; - if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listener.handle, &ev) == -1) + if(epoll_ctl(epfd, EPOLL_CTL_ADD, listener.handle, &ev) == -1) throw new Exception("epoll_ctl " ~ to!string(errno)); + + WorkerThread[] threads = new WorkerThread[](totalCPUs * 1 + 1); + foreach(i, ref thread; threads) { + thread = new WorkerThread(this, handler, cast(int) i); + thread.start(); + } + + bool fiber_crash_check() { + bool hasAnyRunning; + foreach(thread; threads) { + if(!thread.isRunning) { + thread.join(); + } else hasAnyRunning = true; + } + + return (!hasAnyRunning); + } + + + while(running) { + Thread.sleep(1.seconds); + if(fiber_crash_check()) + break; + } + + } else { + semaphore = new Semaphore(); + + // I times 4 here because there's a good chance some will be blocked on i/o. + ConnectionThread[] threads = new ConnectionThread[](totalCPUs * 4); + foreach(i, ref thread; threads) { + thread = new ConnectionThread(this, handler, cast(int) i); + thread.start(); + } } - +/ while(!loopBroken && running) { Socket sn; + bool crash_check() { + bool hasAnyRunning; + foreach(thread; threads) { + if(!thread.isRunning) { + thread.join(); + } else hasAnyRunning = true; + } + + return (!hasAnyRunning); + } + + void accept_new_connection() { sn = listener.accept(); + cloexec(sn); if(tcp) { // disable Nagle's algorithm to avoid a 40ms delay when we send/recv // on the socket because we do some buffering internally. I think this helps, @@ -4167,50 +4613,9 @@ class ListeningConnectionManager { semaphore.notify(); } - bool crash_check() { - bool hasAnyRunning; - foreach(thread; threads) { - if(!thread.isRunning) { - thread.join(); - } else hasAnyRunning = true; - } - - return (!hasAnyRunning); - } - - - /+ - version(linux) { - auto nfds = epoll_wait(epoll_fd, events.ptr, events.length, -1); - if(nfds == -1) { - if(errno == EINTR) - continue; - throw new Exception("epoll_wait " ~ to!string(errno)); - } - - foreach(idx; 0 .. nfds) { - auto flags = events[idx].events; - auto fd = events[idx].data.fd; - if(fd == listener.handle) { - accept_new_connection(); - existing_connection_new_data(); - } else { - if(flags & (EPOLLHUP | EPOLLERR | EPOLLRDHUP)) { - import core.sys.posix.unistd; - close(fd); - } else { - sn = new Socket(cast(socket_t) fd, tcp ? AddressFamily.INET : AddressFamily.UNIX); - import std.stdio; writeln("existing_connection_new_data"); - existing_connection_new_data(); - } - } - } - } else { - +/ - accept_new_connection(); - existing_connection_new_data(); - //} + accept_new_connection(); + existing_connection_new_data(); if(crash_check()) break; @@ -4232,38 +4637,13 @@ class ListeningConnectionManager { this(string host, ushort port, void function(Socket) handler) { this.handler = handler; - if(host.startsWith("unix:")) { - version(Posix) { - listener = new Socket(AddressFamily.UNIX, SocketType.STREAM); - string filename = host["unix:".length .. $].idup; - listener.bind(new UnixAddress(filename)); - cleanup = delegate() { - import std.file; - remove(filename); - }; - tcp = false; - } else { - throw new Exception("unix sockets not supported on this system"); - } - } else if(host.startsWith("abstract:")) { - version(linux) { - listener = new Socket(AddressFamily.UNIX, SocketType.STREAM); - string filename = "\0" ~ host["abstract:".length .. $]; - import std.stdio; stderr.writeln("Listening to abstract unix domain socket: ", host["abstract:".length .. $]); - listener.bind(new UnixAddress(filename)); - tcp = false; - } else { - throw new Exception("abstract unix sockets not supported on this system"); - } - } else { - listener = new TcpSocket(); - listener.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); - listener.bind(host.length ? parseAddress(host, port) : new InternetAddress(port)); - tcp = true; - } + listener = startListening(host, port, tcp, cleanup, 128); + version(cgi_use_fiber) version(cgi_use_fork) + listener.blocking = false; + + // this is the UI control thread and thus gets more priority Thread.getThis.priority = Thread.PRIORITY_MAX; - listener.listen(128); } Socket listener; @@ -4275,14 +4655,66 @@ class ListeningConnectionManager { } } +Socket startListening(string host, ushort port, ref bool tcp, ref void delegate() cleanup, int backQueue) { + Socket listener; + if(host.startsWith("unix:")) { + version(Posix) { + listener = new Socket(AddressFamily.UNIX, SocketType.STREAM); + cloexec(listener); + string filename = host["unix:".length .. $].idup; + listener.bind(new UnixAddress(filename)); + cleanup = delegate() { + listener.close(); + import std.file; + remove(filename); + }; + tcp = false; + } else { + throw new Exception("unix sockets not supported on this system"); + } + } else if(host.startsWith("abstract:")) { + version(linux) { + listener = new Socket(AddressFamily.UNIX, SocketType.STREAM); + cloexec(listener); + string filename = "\0" ~ host["abstract:".length .. $]; + import std.stdio; stderr.writeln("Listening to abstract unix domain socket: ", host["abstract:".length .. $]); + listener.bind(new UnixAddress(filename)); + tcp = false; + } else { + throw new Exception("abstract unix sockets not supported on this system"); + } + } else { + listener = new TcpSocket(); + cloexec(listener); + listener.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); + listener.bind(host.length ? parseAddress(host, port) : new InternetAddress(port)); + cleanup = delegate() { + listener.close(); + }; + tcp = true; + } + + listener.listen(backQueue); + return listener; +} + // helper function to send a lot to a socket. Since this blocks for the buffer (possibly several times), you should probably call it in a separate thread or something. void sendAll(Socket s, const(void)[] data, string file = __FILE__, size_t line = __LINE__) { if(data.length == 0) return; ptrdiff_t amount; + //import std.stdio; writeln("***",cast(string) data,"///"); do { amount = s.send(data); - if(amount == Socket.ERROR) + if(amount == Socket.ERROR) { + version(cgi_use_fiber) { + if(wouldHaveBlocked()) { + bool registered = true; + registerEventWakeup(®istered, s, WakeupEvent.Write); + continue; + } + } throw new ConnectionException(s, lastSocketError, file, line); + } assert(amount > 0); data = data[amount .. $]; } while(data.length); @@ -4404,6 +4836,68 @@ class ConnectionThread : Thread { int myThreadNumber; } +version(cgi_use_fiber) +class WorkerThread : Thread { + this(ListeningConnectionManager lcm, CMT dg, int myThreadNumber) { + this.lcm = lcm; + this.dg = dg; + this.myThreadNumber = myThreadNumber; + super(&run); + } + + void run() { + while(lcm.running) { + Socket sn; + + epoll_event[64] events; + auto nfds = epoll_wait(epfd, events.ptr, events.length, -1); + if(nfds == -1) { + if(errno == EINTR) + continue; + throw new Exception("epoll_wait " ~ to!string(errno)); + } + + foreach(idx; 0 .. nfds) { + auto flags = events[idx].events; + + if(cast(size_t) events[idx].data.ptr == cast(size_t) lcm.listener.handle) { + // this try/catch is because it is set to non-blocking mode + // and Phobos' stupid api throws an exception instead of returning + // if it would block. Why would it block? because a forked process + // might have beat us to it, but the wakeup event thundered our herds. + version(cgi_use_fork) { + try + sn = lcm.listener.accept(); + catch(SocketAcceptException e) { continue; } + } else { + sn = lcm.listener.accept(); + } + + cloexec(sn); + if(lcm.tcp) { + // disable Nagle's algorithm to avoid a 40ms delay when we send/recv + // on the socket because we do some buffering internally. I think this helps, + // certainly does for small requests, and I think it does for larger ones too + sn.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, 1); + + sn.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, dur!"seconds"(10)); + } + + dg(sn); + } else { + auto fiber = cast(CgiFiber) events[idx].data.ptr; + fiber.proceed(); + } + } + } + } + + ListeningConnectionManager lcm; + CMT dg; + int myThreadNumber; +} + + /* Done with network helper */ /* Helpers for doing temporary files. Used both here and in web.d */ @@ -4687,6 +5181,9 @@ version(cgi_with_websocket) { return true; } + if(bfr.sourceClosed) + return false; + bfr.popFront(0); if(bfr.sourceClosed) return false; @@ -4910,6 +5407,7 @@ version(cgi_with_websocket) { // that's how it indicates that it needs more data if(d is orig) return WebSocketFrame.init; + m.unmaskInPlace(); switch(m.opcode) { case WebSocketOpcode.continuation: if(continuingData.length + m.data.length > config.maximumMessageSize) @@ -4963,7 +5461,10 @@ version(cgi_with_websocket) { default: // ignore though i could and perhaps should throw too } } - receiveBufferUsedLength -= s.length - d.length; + + import core.stdc.string; + memmove(receiveBuffer.ptr, d.ptr, d.length); + receiveBufferUsedLength = d.length; return m; } @@ -5109,7 +5610,7 @@ version(cgi_with_websocket) { } headerScratchPos += 8; - } else if(realLength > 127) { + } else if(realLength > 125) { // use 16 bit length b2 |= 0x7e; @@ -5223,19 +5724,20 @@ version(cgi_with_websocket) { msg.data = d[0 .. cast(size_t) msg.realLength]; d = d[cast(size_t) msg.realLength .. $]; - if(msg.masked) { - // let's just unmask it now + return msg; + } + + void unmaskInPlace() { + if(this.masked) { int keyIdx = 0; - foreach(i; 0 .. msg.data.length) { - msg.data[i] = msg.data[i] ^ msg.maskingKey[keyIdx]; + foreach(i; 0 .. this.data.length) { + this.data[i] = this.data[i] ^ this.maskingKey[keyIdx]; if(keyIdx == 3) keyIdx = 0; else keyIdx++; } } - - return msg; } char[] textData() { @@ -5272,13 +5774,13 @@ version(Posix) { //private: // template for laziness -void startWebsocketServer()() { +void startAddonServer()(string arg) { version(linux) { import core.sys.posix.unistd; pid_t pid; const(char)*[16] args; args[0] = "ARSD_CGI_WEBSOCKET_SERVER"; - args[1] = "--websocket-server"; + args[1] = arg.ptr; posix_spawn(&pid, "/proc/self/exe", null, null, @@ -5295,10 +5797,12 @@ void startWebsocketServer()() { startupInfo.cb = cast(DWORD) startupInfo.sizeof; PROCESS_INFORMATION processInfo; + import std.utf; + // I *MIGHT* need to run it as a new job or a service... auto ret = CreateProcessW( filename.ptr, - "--websocket-server"w, + toUTF16z(arg), null, // process attributes null, // thread attributes false, // inherit handles @@ -5379,7 +5883,7 @@ version(Posix) { } version(with_addon_servers_connections) -LocalServerConnectionHandle openLocalServerConnection(string name) { +LocalServerConnectionHandle openLocalServerConnection()(string name, string arg) { version(Posix) { import core.sys.posix.unistd; import core.sys.posix.sys.un; @@ -5391,6 +5895,8 @@ LocalServerConnectionHandle openLocalServerConnection(string name) { scope(failure) close(sock); + cloexec(sock); + // add-on server processes are assumed to be local, and thus will // use unix domain sockets. Besides, I want to pass sockets to them, // so it basically must be local (except for the session server, but meh). @@ -5405,8 +5911,21 @@ LocalServerConnectionHandle openLocalServerConnection(string name) { addr.sun_path[0 .. name.length] = cast(typeof(addr.sun_path[])) name[]; } - if(connect(sock, cast(sockaddr*) &addr, addr.sizeof) == -1) - throw new Exception("connect " ~ to!string(errno)); + bool alreadyTried; + + try_again: + + if(connect(sock, cast(sockaddr*) &addr, addr.sizeof) == -1) { + if(!alreadyTried && errno == ECONNREFUSED) { + // try auto-spawning the server, then attempt connection again + startAddonServer(arg); + import core.thread; + Thread.sleep(50.msecs); + alreadyTried = true; + goto try_again; + } else + throw new Exception("connect " ~ to!string(errno)); + } return sock; } else version(Windows) { @@ -5465,11 +5984,11 @@ struct IoOp { private int bufferLengthUsed; private ubyte[1] internalBuffer; // it can be overallocated! - ubyte[] allocatedBuffer() { + ubyte[] allocatedBuffer() return { return internalBuffer.ptr[0 .. bufferLengthAllocated]; } - ubyte[] usedBuffer() { + ubyte[] usedBuffer() return { return allocatedBuffer[0 .. bufferLengthUsed]; } @@ -5712,7 +6231,7 @@ private immutable void delegate(string[])[string] scheduledJobHandlers; version(with_breaking_cgi_features) mixin(q{ -mixin template ImplementRpcClientInterface(T, string serverPath) { +mixin template ImplementRpcClientInterface(T, string serverPath, string cmdArg) { static import std.traits; // derivedMembers on an interface seems to give exactly what I want: the virtual functions we need to implement. so I am just going to use it directly without more filtering. @@ -5774,7 +6293,7 @@ mixin template ImplementRpcClientInterface(T, string serverPath) { } void connect() { - connectionHandle = openLocalServerConnection(serverPath); + connectionHandle = openLocalServerConnection(serverPath, cmdArg); } void disconnect() { @@ -5801,7 +6320,7 @@ void dispatchRpcServer(Interface, Class)(Class this_, ubyte[] data, int fd) if(i import std.traits; sw: switch(calledIdx) { - static foreach(idx, memberName; __traits(derivedMembers, Interface)) + foreach(idx, memberName; __traits(derivedMembers, Interface)) static if(__traits(isVirtualFunction, __traits(getMember, Interface, memberName))) { case idx: assert(calledFunction == __traits(getMember, Interface, memberName).mangleof); @@ -5842,7 +6361,7 @@ private struct SerializationBuffer { bufferLocation += data.length; } - ubyte[] sendable() { + ubyte[] sendable() return { return bufferBacking[0 .. bufferLocation]; } } @@ -5942,7 +6461,7 @@ interface Session(Data) : SessionObject { static if(is(typeof(__traits(getMember, Data, memberName)))) mixin(q{ @property inout(typeof(__traits(getMember, Data, memberName))) } ~ memberName ~ q{ () inout; - @property void } ~ memberName ~ q{ (typeof(__traits(getMember, Data, memberName)) value); + @property typeof(__traits(getMember, Data, memberName)) } ~ memberName ~ q{ (typeof(__traits(getMember, Data, memberName)) value); }); } @@ -6030,12 +6549,13 @@ class BasicDataServerSession(Data) : Session!Data { // basically. Assuming the session is POD this should be fine. return cast(typeof(return)) to!(typeof(__traits(getMember, Data, memberName)))(v); } - @property void } ~ memberName ~ q{ (typeof(__traits(getMember, Data, memberName)) value) { + @property typeof(__traits(getMember, Data, memberName)) } ~ memberName ~ q{ (typeof(__traits(getMember, Data, memberName)) value) { if(sessionId is null) start(); import std.conv; import std.traits; BasicDataServer.connection.setSessionData(sessionId, fullyQualifiedName!Data ~ "." ~ memberName, to!string(value)); + return value; } }); } @@ -6059,8 +6579,8 @@ class MockSession(Data) : Session!Data { @property inout(typeof(__traits(getMember, Data, memberName))) } ~ memberName ~ q{ () inout { return __traits(getMember, store_, memberName); } - @property void } ~ memberName ~ q{ (typeof(__traits(getMember, Data, memberName)) value) { - __traits(getMember, store_, memberName) = value; + @property typeof(__traits(getMember, Data, memberName)) } ~ memberName ~ q{ (typeof(__traits(getMember, Data, memberName)) value) { + return __traits(getMember, store_, memberName) = value; } }); } @@ -6094,7 +6614,7 @@ interface BasicDataServer { version(with_addon_servers_connections) class BasicDataServerConnection : BasicDataServer { - mixin ImplementRpcClientInterface!(BasicDataServer, "/tmp/arsd_session_server"); + mixin ImplementRpcClientInterface!(BasicDataServer, "/tmp/arsd_session_server", "--session-server"); } version(with_addon_servers) @@ -6268,7 +6788,7 @@ interface ScheduledJobServer { version(with_addon_servers_connections) class ScheduledJobServerConnection : ScheduledJobServer { - mixin ImplementRpcClientInterface!(ScheduledJobServer, "/tmp/arsd_scheduled_job_server"); + mixin ImplementRpcClientInterface!(ScheduledJobServer, "/tmp/arsd_scheduled_job_server", "--timer-server"); } version(with_addon_servers) @@ -6405,7 +6925,7 @@ interface EventSourceServer { cgi.flush(); cgi.closed = true; - auto s = openLocalServerConnection("/tmp/arsd_cgi_event_server"); + auto s = openLocalServerConnection("/tmp/arsd_cgi_event_server", "--event-server"); scope(exit) closeLocalServerConnection(s); @@ -6443,7 +6963,7 @@ interface EventSourceServer { [sendEventToEventServer] +/ public static void sendEvent(string url, string event, string data, int lifetime) { - auto s = openLocalServerConnection("/tmp/arsd_cgi_event_server"); + auto s = openLocalServerConnection("/tmp/arsd_cgi_event_server", "--event-server"); scope(exit) closeLocalServerConnection(s); @@ -6564,14 +7084,14 @@ final class EventSourceServerImplementation : EventSourceServer, EventIoServer { int lastEventIdLength; char[32] lastEventIdBuffer = 0; - char[] url() { + char[] url() return { return urlBuffer[0 .. urlLength]; } void url(in char[] u) { urlBuffer[0 .. u.length] = u[]; urlLength = cast(int) u.length; } - char[] lastEventId() { + char[] lastEventId() return { return lastEventIdBuffer[0 .. lastEventIdLength]; } void populate(bool responseChunked, in char[] url, in char[] lastEventId) @@ -6599,13 +7119,13 @@ final class EventSourceServerImplementation : EventSourceServer, EventIoServer { char[2048] messageBuffer = 0; int _lifetime; - char[] message() { + char[] message() return { return messageBuffer[0 .. messageLength]; } - char[] type() { + char[] type() return { return typeBuffer[0 .. typeLength]; } - char[] url() { + char[] url() return { return urlBuffer[0 .. urlLength]; } void url(in char[] u) { @@ -6733,6 +7253,8 @@ void runAddonServer(EIS)(string localListenerName, EIS eis) if(is(EIS : EventIoS scope(failure) close(sock); + cloexec(sock); + // add-on server processes are assumed to be local, and thus will // use unix domain sockets. Besides, I want to pass sockets to them, // so it basically must be local (except for the session server, but meh). @@ -6840,6 +7362,7 @@ void runAddonServer(EIS)(string localListenerName, EIS eis) if(is(EIS : EventIoS } throw new Exception("accept " ~ to!string(errno)); } + cloexec(ns); makeNonBlocking(ns); auto niop = allocateIoOp(ns, IoOp.ReadSocketHandle, 4096, &eis.handleLocalConnectionData); @@ -7078,11 +7601,11 @@ struct DispatcherDefinition(alias dispatchHandler, DispatcherDetails = typeof(nu immutable(DispatcherDetails) details; } -private string urlify(string name) { +private string urlify(string name) pure { return beautify(name, '-', true); } -private string beautify(string name, char space = ' ', bool allLowerCase = false) { +private string beautify(string name, char space = ' ', bool allLowerCase = false) pure { if(name == "id") return allLowerCase ? name : "ID"; @@ -7203,7 +7726,7 @@ auto callFromCgi(alias method, T)(T dg, Cgi cgi) { // first, check for missing arguments and initialize to defaults if necessary static if(is(typeof(method) P == __parameters)) - static foreach(idx, param; P) {{ + foreach(idx, param; P) {{ // see: mustNotBeSetFromWebParams static if(is(param : Cgi)) { static assert(!is(param == immutable)); @@ -7213,7 +7736,7 @@ auto callFromCgi(alias method, T)(T dg, Cgi cgi) { cast() params[idx] = cgi.getSessionObject!D(); } else { bool populated; - static foreach(uda; __traits(getAttributes, P[idx .. idx + 1])) { + foreach(uda; __traits(getAttributes, P[idx .. idx + 1])) { static if(is(uda == ifCalledFromWeb!func, alias func)) { static if(is(typeof(func(cgi)))) params[idx] = func(cgi); @@ -7279,7 +7802,7 @@ auto callFromCgi(alias method, T)(T dg, Cgi cgi) { // set the child member switch(paramName) { - static foreach(idx, memberName; __traits(allMembers, T)) + foreach(idx, memberName; __traits(allMembers, T)) static if(__traits(compiles, __traits(getMember, T, memberName).offsetof)) { // data member! case memberName: @@ -7373,7 +7896,7 @@ auto callFromCgi(alias method, T)(T dg, Cgi cgi) { sw: switch(paramName) { static if(is(typeof(method) P == __parameters)) - static foreach(idx, param; P) { + foreach(idx, param; P) { static if(mustNotBeSetFromWebParams!(P[idx], __traits(getAttributes, P[idx .. idx + 1]))) { // cannot be set from the outside } else { @@ -7424,7 +7947,7 @@ private bool mustNotBeSetFromWebParams(T, attrs...)() { } else static if(__traits(compiles, T.getAutomaticallyForCgi(Cgi.init))) { return true; } else { - static foreach(uda; attrs) + foreach(uda; attrs) static if(is(uda == ifCalledFromWeb!func, alias func)) return true; return false; @@ -7432,7 +7955,7 @@ private bool mustNotBeSetFromWebParams(T, attrs...)() { } private bool hasIfCalledFromWeb(attrs...)() { - static foreach(uda; attrs) + foreach(uda; attrs) static if(is(uda == ifCalledFromWeb!func, alias func)) return true; return false; @@ -7508,11 +8031,11 @@ private bool hasIfCalledFromWeb(attrs...)() { --- class MyPresenter : WebPresenter!(MyPresenter) { @Override - void presentSuccessfulReturnAsHtml(T : CustomType)(Cgi cgi, T ret) { + void presentSuccessfulReturnAsHtml(T : CustomType)(Cgi cgi, T ret, typeof(null) meta) { // present the CustomType } @Override - void presentSuccessfulReturnAsHtml(T)(Cgi cgi, T ret) { + void presentSuccessfulReturnAsHtml(T)(Cgi cgi, T ret, typeof(null) meta) { // handle everything else via the super class, which will call // back to your class when appropriate super.presentSuccessfulReturnAsHtml(cgi, ret); @@ -7520,6 +8043,8 @@ private bool hasIfCalledFromWeb(attrs...)() { } --- + The meta argument in there can be overridden by your own facility. + +/ class WebPresenter(CRTP) { @@ -7657,26 +8182,35 @@ html", true, true); cgi.write(c.parentDocument.toString(), true); } + template methodMeta(alias method) { + enum methodMeta = null; + } + + void presentSuccessfulReturn(T, Meta)(Cgi cgi, T ret, Meta meta, string format) { + // FIXME? format? + (cast(CRTP) this).presentSuccessfulReturnAsHtml(cgi, ret, meta); + } + /// typeof(null) (which is also used to represent functions returning `void`) do nothing /// in the default presenter - allowing the function to have full low-level control over the /// response. - void presentSuccessfulReturnAsHtml(T : typeof(null))(Cgi cgi, T ret) { + void presentSuccessfulReturn(T : typeof(null))(Cgi cgi, T ret, typeof(null) meta, string format) { // nothing intentionally! } /// Redirections are forwarded to [Cgi.setResponseLocation] - void presentSuccessfulReturnAsHtml(T : Redirection)(Cgi cgi, T ret) { + void presentSuccessfulReturn(T : Redirection)(Cgi cgi, T ret, typeof(null) meta, string format) { cgi.setResponseLocation(ret.to, true, getHttpCodeText(ret.code)); } /// Multiple responses deconstruct the algebraic type and forward to the appropriate handler at runtime - void presentSuccessfulReturnAsHtml(T : MultipleResponses!Types, Types...)(Cgi cgi, T ret) { + void presentSuccessfulReturn(T : MultipleResponses!Types, Types...)(Cgi cgi, T ret, typeof(null) meta, string format) { bool outputted = false; - static foreach(index, type; Types) { + foreach(index, type; Types) { if(ret.contains == index) { assert(!outputted); outputted = true; - (cast(CRTP) this).presentSuccessfulReturnAsHtml(cgi, ret.payload[index]); + (cast(CRTP) this).presentSuccessfulReturnAsHtml(cgi, ret.payload[index], meta); } } if(!outputted) @@ -7684,14 +8218,14 @@ html", true, true); } /// An instance of the [arsd.dom.FileResource] interface has its own content type; assume it is a download of some sort. - void presentSuccessfulReturnAsHtml(T : FileResource)(Cgi cgi, T ret) { + void presentSuccessfulReturn(T : FileResource)(Cgi cgi, T ret, typeof(null) meta, string format) { cgi.setCache(true); // not necessarily true but meh cgi.setResponseContentType(ret.contentType); cgi.write(ret.getData(), true); } - /// And the default handler will call [formatReturnValueAsHtml] and place it inside the [htmlContainer]. - void presentSuccessfulReturnAsHtml(T)(Cgi cgi, T ret) { + /// And the default handler for HTML will call [formatReturnValueAsHtml] and place it inside the [htmlContainer]. + void presentSuccessfulReturnAsHtml(T)(Cgi cgi, T ret, typeof(null) meta) { auto container = this.htmlContainer(); container.appendChild(formatReturnValueAsHtml(ret)); cgi.write(container.parentDocument.toString(), true); @@ -7787,7 +8321,7 @@ html", true, true); auto fieldset = div.addChild("fieldset"); fieldset.addChild("legend", beautify(T.stringof)); // FIXME fieldset.addChild("input", name); - static foreach(idx, memberName; __traits(allMembers, T)) + foreach(idx, memberName; __traits(allMembers, T)) static if(__traits(compiles, __traits(getMember, T, memberName).offsetof)) { fieldset.appendChild(elementFor!(typeof(__traits(getMember, T, memberName)))(beautify(memberName), name ~ "." ~ memberName)); } @@ -7862,7 +8396,11 @@ html", true, true); form.addClass("automatic-form"); - form.addChild("h3", beautify(__traits(identifier, method))); + string formDisplayName = beautify(__traits(identifier, method)); + foreach(attr; __traits(getAttributes, method)) + static if(is(typeof(attr) == DisplayName)) + formDisplayName = attr.name; + form.addChild("h3", formDisplayName); import std.traits; @@ -7871,13 +8409,13 @@ html", true, true); //alias defaults = ParameterDefaults!method; static if(is(typeof(method) P == __parameters)) - static foreach(idx, _; P) {{ + foreach(idx, _; P) {{ alias param = P[idx .. idx + 1]; static if(!mustNotBeSetFromWebParams!(param[0], __traits(getAttributes, param))) { string displayName = beautify(__traits(identifier, param)); - static foreach(attr; __traits(getAttributes, param)) + foreach(attr; __traits(getAttributes, param)) static if(is(typeof(attr) == DisplayName)) displayName = attr.name; auto i = form.appendChild(elementFor!(param)(displayName, __traits(identifier, param))); @@ -7905,10 +8443,10 @@ html", true, true); //alias idents = ParameterIdentifierTuple!method; //alias defaults = ParameterDefaults!method; - static foreach(idx, memberName; __traits(derivedMembers, T)) {{ + foreach(idx, memberName; __traits(derivedMembers, T)) {{ static if(__traits(compiles, __traits(getMember, obj, memberName).offsetof)) { string displayName = beautify(memberName); - static foreach(attr; __traits(getAttributes, __traits(getMember, T, memberName))) + foreach(attr; __traits(getAttributes, __traits(getMember, T, memberName))) static if(is(typeof(attr) == DisplayName)) displayName = attr.name; form.appendChild(elementFor!(typeof(__traits(getMember, T, memberName)))(displayName, memberName)); @@ -7930,11 +8468,17 @@ html", true, true); } else static if(is(T : Element)) { return t; } else static if(is(T == MultipleResponses!Types, Types...)) { - static foreach(index, type; Types) { + foreach(index, type; Types) { if(t.contains == index) return formatReturnValueAsHtml(t.payload[index]); } assert(0); + } else static if(is(T == Paginated!E, E)) { + auto e = Element.make("div").addClass("paginated-result"); + e.appendChild(formatReturnValueAsHtml(t.items)); + if(t.nextPageUrl.length) + e.appendChild(Element.make("a", "Next Page", t.nextPageUrl)); + return e; } else static if(isIntegral!T || isSomeString!T || isFloatingPoint!T) { return Element.make("span", to!string(t), "automatic-data-display"); } else static if(is(T == V[K], K, V)) { @@ -7949,7 +8493,7 @@ html", true, true); auto dl = Element.make("dl"); dl.addClass("automatic-data-display"); - static foreach(idx, memberName; __traits(allMembers, T)) + foreach(idx, memberName; __traits(allMembers, T)) static if(__traits(compiles, __traits(getMember, T, memberName).offsetof)) { dl.addChild("dt", memberName); dl.addChild("dt", formatReturnValueAsHtml(__traits(getMember, t, memberName))); @@ -7964,7 +8508,7 @@ html", true, true); auto table = cast(Table) Element.make("table"); table.addClass("automatic-data-display"); string[] names; - static foreach(idx, memberName; __traits(derivedMembers, E)) + foreach(idx, memberName; __traits(derivedMembers, E)) static if(__traits(compiles, __traits(getMember, E, memberName).offsetof)) { names ~= beautify(memberName); } @@ -7972,7 +8516,7 @@ html", true, true); foreach(l; t) { auto tr = table.appendRow(); - static foreach(idx, memberName; __traits(derivedMembers, E)) + foreach(idx, memberName; __traits(derivedMembers, E)) static if(__traits(compiles, __traits(getMember, E, memberName).offsetof)) { static if(memberName == "id") { string val = to!string(__traits(getMember, l, memberName)); @@ -7990,7 +8534,7 @@ html", true, true); auto table = cast(Table) Element.make("table"); table.addClass("automatic-data-display"); string[] names; - static foreach(idx, memberName; __traits(allMembers, E)) + foreach(idx, memberName; __traits(allMembers, E)) static if(__traits(compiles, __traits(getMember, E, memberName).offsetof)) { names ~= beautify(memberName); } @@ -7998,7 +8542,7 @@ html", true, true); foreach(l; t) { auto tr = table.appendRow(); - static foreach(idx, memberName; __traits(allMembers, E)) + foreach(idx, memberName; __traits(allMembers, E)) static if(__traits(compiles, __traits(getMember, E, memberName).offsetof)) { tr.addChild("td", formatReturnValueAsHtml(__traits(getMember, l, memberName))); } @@ -8087,7 +8631,7 @@ struct MultipleResponses(T...) { alias findHandler = findHandler!(type, HandlersToCheck[1 .. $]); } } - static foreach(index, type; T) {{ + foreach(index, type; T) { alias handler = findHandler!(type, Handlers); static if(is(handler == void)) static assert(0, "Type " ~ type.stringof ~ " was not handled by visitor"); @@ -8095,7 +8639,7 @@ struct MultipleResponses(T...) { if(index == contains) handler(payload[index]); } - }} + } } /+ @@ -8195,7 +8739,7 @@ private auto serveApiInternal(T)(string urlPrefix) { static if(is(typeof(T.__ctor) P == __parameters)) { P params; - static foreach(pidx, param; P) { + foreach(pidx, param; P) { static if(is(param : Cgi)) { static assert(!is(param == immutable)); cast() params[pidx] = cgi; @@ -8205,7 +8749,7 @@ private auto serveApiInternal(T)(string urlPrefix) { } else { static if(hasIfCalledFromWeb!(__traits(getAttributes, P[pidx .. pidx + 1]))) { - static foreach(uda; __traits(getAttributes, P[pidx .. pidx + 1])) { + foreach(uda; __traits(getAttributes, P[pidx .. pidx + 1])) { static if(is(uda == ifCalledFromWeb!func, alias func)) { static if(is(typeof(func(cgi)))) params[pidx] = func(cgi); @@ -8285,9 +8829,9 @@ private auto serveApiInternal(T)(string urlPrefix) { hack ~= "/"; switch(hack) { - static foreach(methodName; __traits(derivedMembers, T)) + foreach(methodName; __traits(derivedMembers, T)) static if(methodName != "__ctor") - static foreach(idx, overload; __traits(getOverloads, T, methodName)) {{ + foreach(idx, overload; __traits(getOverloads, T, methodName)) { static if(is(typeof(overload) P == __parameters)) static if(is(typeof(overload) R == return)) static if(__traits(getProtection, overload) == "public" || __traits(getProtection, overload) == "export") @@ -8305,7 +8849,9 @@ private auto serveApiInternal(T)(string urlPrefix) { P params; - static foreach(pidx, param; P) { + string ident; + + foreach(pidx, param; P) { static if(is(param : Cgi)) { static assert(!is(param == immutable)); cast() params[pidx] = cgi; @@ -8314,7 +8860,7 @@ private auto serveApiInternal(T)(string urlPrefix) { cast() params[pidx] = cgi.getSessionObject!D(); } else { static if(hasIfCalledFromWeb!(__traits(getAttributes, P[pidx .. pidx + 1]))) { - static foreach(uda; __traits(getAttributes, P[pidx .. pidx + 1])) { + foreach(uda; __traits(getAttributes, P[pidx .. pidx + 1])) { static if(is(uda == ifCalledFromWeb!func, alias func)) { static if(is(typeof(func(cgi)))) params[pidx] = func(cgi); @@ -8327,7 +8873,7 @@ private auto serveApiInternal(T)(string urlPrefix) { static if(__traits(compiles, { params[pidx] = param.getAutomaticallyForCgi(cgi); } )) { params[pidx] = param.getAutomaticallyForCgi(cgi); } else static if(is(param == string)) { - auto ident = nextPieceFromSlash(remainingUrl); + ident = nextPieceFromSlash(remainingUrl); if(ident is null) { // trailing slash mandated on subresources cgi.setResponseLocation(cgi.pathInfo ~ "/"); @@ -8433,7 +8979,7 @@ private auto serveApiInternal(T)(string urlPrefix) { // a void return (or typeof(null) lol) means you, the user, is doing it yourself. Gives full control. try { auto ret = callFromCgi!(__traits(getOverloads, obj, methodName)[idx])(&(__traits(getOverloads, obj, methodName)[idx]), cgi); - presenter.presentSuccessfulReturnAsHtml(cgi, ret); + presenter.presentSuccessfulReturn(cgi, ret, presenter.methodMeta!(__traits(getOverloads, obj, methodName)[idx]), "html"); } catch(Throwable t) { presenter.presentExceptionAsHtml!(__traits(getOverloads, obj, methodName)[idx])(cgi, t, &(__traits(getOverloads, obj, methodName)[idx])); } @@ -8442,7 +8988,7 @@ private auto serveApiInternal(T)(string urlPrefix) { auto ret = callFromCgi!(__traits(getOverloads, obj, methodName)[idx])(&(__traits(getOverloads, obj, methodName)[idx]), cgi); static if(is(typeof(ret) == MultipleResponses!Types, Types...)) { var json; - static foreach(index, type; Types) { + foreach(index, type; Types) { if(ret.contains == index) json = ret.payload[index]; } @@ -8469,7 +9015,7 @@ private auto serveApiInternal(T)(string urlPrefix) { //return true; } } - }} + } case "script.js": cgi.setResponseContentType("text/javascript"); cgi.gzipResponse = true; @@ -8491,7 +9037,7 @@ private auto serveApiInternal(T)(string urlPrefix) { string defaultFormat(alias method)() { bool nonConstConditionForWorkingAroundASpuriousDmdWarning = true; - static foreach(attr; __traits(getAttributes, method)) { + foreach(attr; __traits(getAttributes, method)) { static if(is(typeof(attr) == DefaultFormat)) { if(nonConstConditionForWorkingAroundASpuriousDmdWarning) return attr.value; @@ -8500,11 +9046,16 @@ string defaultFormat(alias method)() { return "html"; } +struct Paginated(T) { + T[] items; + string nextPageUrl; +} + string[] urlNamesForMethod(alias method)(string def) { auto verb = Cgi.RequestMethod.GET; bool foundVerb = false; bool foundNoun = false; - static foreach(attr; __traits(getAttributes, method)) { + foreach(attr; __traits(getAttributes, method)) { static if(is(typeof(attr) == Cgi.RequestMethod)) { verb = attr; if(foundVerb) @@ -8883,7 +9434,7 @@ bool restObjectServeHandler(T, Presenter)(Cgi cgi, Presenter presenter, string u div.addClass("Dclass_" ~ T.stringof); div.dataset.url = urlId; bool first = true; - static foreach(idx, memberName; __traits(derivedMembers, T)) + foreach(idx, memberName; __traits(derivedMembers, T)) static if(__traits(compiles, __traits(getMember, obj, memberName).offsetof)) { if(!first) div.addChild("br"); else first = false; div.appendChild(presenter.formatReturnValueAsHtml(__traits(getMember, obj, memberName))); @@ -8893,7 +9444,7 @@ bool restObjectServeHandler(T, Presenter)(Cgi cgi, Presenter presenter, string u obj.toJsonFromReflection = delegate(t) { import arsd.jsvar; var v = var.emptyObject(); - static foreach(idx, memberName; __traits(derivedMembers, T)) + foreach(idx, memberName; __traits(derivedMembers, T)) static if(__traits(compiles, __traits(getMember, obj, memberName).offsetof)) { v[memberName] = __traits(getMember, obj, memberName); } @@ -8927,7 +9478,7 @@ bool restObjectServeHandler(T, Presenter)(Cgi cgi, Presenter presenter, string u static void applyChangesTemplate(Obj)(Cgi cgi, Obj obj) { - static foreach(idx, memberName; __traits(derivedMembers, Obj)) + foreach(idx, memberName; __traits(derivedMembers, Obj)) static if(__traits(compiles, __traits(getMember, obj, memberName).offsetof)) { __traits(getMember, obj, memberName) = cgi.request(memberName, __traits(getMember, obj, memberName)); } @@ -8996,7 +9547,7 @@ bool restObjectServeHandler(T, Presenter)(Cgi cgi, Presenter presenter, string u var json = var.emptyArray; foreach(r; results.results) { var o = var.emptyObject; - static foreach(idx, memberName; __traits(derivedMembers, typeof(r))) + foreach(idx, memberName; __traits(derivedMembers, typeof(r))) static if(__traits(compiles, __traits(getMember, r, memberName).offsetof)) { o[memberName] = __traits(getMember, r, memberName); } @@ -9163,6 +9714,8 @@ string contentTypeFromFileExtension(string filename) { return "text/css"; if(filename.endsWith(".js")) return "application/javascript"; + if(filename.endsWith(".wasm")) + return "application/wasm"; if(filename.endsWith(".mp3")) return "audio/mpeg"; return null; @@ -9358,7 +9911,7 @@ template dispatcher(definitions...) { bool dispatcher(DispatcherData)(DispatcherData dispatcherData) if(!is(DispatcherData : Cgi)) { // I can prolly make this more efficient later but meh. - static foreach(definition; definitions) { + foreach(definition; definitions) { if(definition.rejectFurther) { if(dispatcherData.cgi.pathInfo[dispatcherData.pathInfoStart .. $] == definition.urlPrefix) { auto ret = definition.handler( @@ -9387,6 +9940,55 @@ template dispatcher(definitions...) { }); +private struct StackBuffer { + char[1024] initial = void; + char[] buffer; + size_t position; + + this(int a) { + buffer = initial[]; + position = 0; + } + + void add(in char[] what) { + if(position + what.length > buffer.length) + buffer.length = position + what.length + 1024; // reallocate with GC to handle special cases + buffer[position .. position + what.length] = what[]; + position += what.length; + } + + void add(in char[] w1, in char[] w2, in char[] w3 = null) { + add(w1); + add(w2); + add(w3); + } + + void add(long v) { + char[16] buffer = void; + auto pos = buffer.length; + bool negative; + if(v < 0) { + negative = true; + v = -v; + } + do { + buffer[--pos] = cast(char) (v % 10 + '0'); + v /= 10; + } while(v); + + if(negative) + buffer[--pos] = '-'; + + auto res = buffer[pos .. $]; + + add(res[]); + } + + char[] get() @nogc { + return buffer[0 .. position]; + } +} + /+ /++ This is the beginnings of my web.d 2.0 - it dispatches web requests to a class object. -- cgit v1.2.3