summaryrefslogtreecommitdiffhomepage
path: root/src/ext_depends/arsd/core.d
diff options
context:
space:
mode:
authorRalph Amissah <ralph.amissah@gmail.com>2023-09-06 12:03:02 -0400
committerRalph Amissah <ralph.amissah@gmail.com>2023-09-06 12:03:02 -0400
commit3f5436a298df3349c0c301a767d24db08fb9129a (patch)
treeff9e919fd02b11353b0b2bc6355ee2243fb0dddb /src/ext_depends/arsd/core.d
parenthousekeeping, updates incl. ldc-1.34.0, dub-1.34.0 (diff)
arsd cgi.d updated
Diffstat (limited to 'src/ext_depends/arsd/core.d')
-rw-r--r--src/ext_depends/arsd/core.d849
1 files changed, 706 insertions, 143 deletions
diff --git a/src/ext_depends/arsd/core.d b/src/ext_depends/arsd/core.d
index 23a699d..49af24d 100644
--- a/src/ext_depends/arsd/core.d
+++ b/src/ext_depends/arsd/core.d
@@ -1,11 +1,25 @@
/++
- Please note: the api and behavior of this module is not externally stable at this time. See the documentation on specific functions.
+ $(PITFALL
+ Please note: the api and behavior of this module is not externally stable at this time. See the documentation on specific functions for details.
+ )
Shared core functionality including exception helpers, library loader, event loop, and possibly more. Maybe command line processor and uda helper and some basic shared annotation types.
- I'll probably move the url, websocket, and ssl stuff in here too as they are often shared. Maybe a small internationalization helper type (a hook for external implementation) and COM helpers too.
+ I'll probably move the url, websocket, and ssl stuff in here too as they are often shared. Maybe a small internationalization helper type (a hook for external implementation) and COM helpers too. I might move the process helpers out to their own module - even things in here are not considered stable to library users at this time!
+
+ If you use this directly outside the arsd library despite its current instability caveats, you might consider using `static import` since names in here are likely to clash with Phobos if you use them together. `static import` will let you easily disambiguate and avoid name conflict errors if I add more here. Some names even clash deliberately to remind me to avoid some antipatterns inside the arsd modules!
+
+ ## Contributor notes
+
+ arsd.core should be focused on things that enable interoperability primarily and secondarily increased code quality between other, otherwise independent arsd modules. As a foundational library, it is not permitted to import anything outside the druntime `core` namespace, except in templates and examples not normally compiled in. This keeps it independent and avoids transitive dependency spillover to end users while also keeping compile speeds fast. To help keep builds snappy, also avoid significant use of ctfe inside this module.
+
+ On my linux computer, `dmd -unittest -main core.d` takes about a quarter second to run. We do not want this to grow.
- If you use this directly outside the arsd library, you might consider using `static import` since names in here are likely to clash with Phobos if you use them together. `static import` will let you easily disambiguate and avoid name conflict errors if I add more here. Some names even clash deliberately to remind me to avoid some antipatterns inside the arsd modules!
+ `@safe` compatibility is ok when it isn't too big of a hassle. `@nogc` is a non-goal. I might accept it on some of the trivial functions but if it means changing the logic in any way to support, you will need a compelling argument to justify it. The arsd libs are supposed to be reliable and easy to use. That said, of course, don't be unnecessarily wasteful - if you can easily provide a reliable and easy to use way to let advanced users do their thing without hurting the other cases, let's discuss it.
+
+ If functionality is not needed by multiple existing arsd modules, consider adding a new module instead of adding it to the core.
+
+ Unittests should generally be hidden behind a special version guard so they don't interfere with end user tests.
History:
Added March 2023 (dub v11.0). Several functions were migrated in here at that time, noted individually. Members without a note were added with the module.
@@ -47,7 +61,9 @@ version(Windows) {
} else version(linux) {
version=Arsd_core_epoll;
- version=Arsd_core_has_cloexec;
+ static if(__VERSION__ >= 2098) {
+ version=Arsd_core_has_cloexec;
+ }
} else version(FreeBSD) {
version=Arsd_core_kqueue;
@@ -884,11 +900,17 @@ unittest {
+/
nothrow @safe @nogc pure
inout(char)[] stripInternal(return inout(char)[] s) {
+ bool isAllWhitespace = true;
foreach(i, char c; s)
if(c != ' ' && c != '\t' && c != '\n' && c != '\r') {
s = s[i .. $];
+ isAllWhitespace = false;
break;
}
+
+ if(isAllWhitespace)
+ return s[$..$];
+
for(int a = cast(int)(s.length - 1); a > 0; a--) {
char c = s[a];
if(c != ' ' && c != '\t' && c != '\n' && c != '\r') {
@@ -900,15 +922,19 @@ inout(char)[] stripInternal(return inout(char)[] s) {
return s;
}
+/// ditto
nothrow @safe @nogc pure
inout(char)[] stripRightInternal(return inout(char)[] s) {
- for(int a = cast(int)(s.length - 1); a > 0; a--) {
- char c = s[a];
+ bool isAllWhitespace = true;
+ foreach_reverse(a, c; s) {
if(c != ' ' && c != '\t' && c != '\n' && c != '\r') {
s = s[0 .. a + 1];
+ isAllWhitespace = false;
break;
}
}
+ if(isAllWhitespace)
+ s = s[0..0];
return s;
@@ -1372,6 +1398,10 @@ class InvalidArgumentsException : ArsdExceptionBase {
], functionName, file, line, next);
}
+ this(string argumentName, string argumentDescription, string functionName = __PRETTY_FUNCTION__, string file = __FILE__, size_t line = __LINE__, Throwable next = null) {
+ this(argumentName, argumentDescription, LimitedVariant.init, functionName, file, line, next);
+ }
+
override void getAdditionalPrintableInformation(scope void delegate(string name, in char[] value) sink) const {
// FIXME: print the details better
foreach(arg; invalidArguments)
@@ -1841,7 +1871,7 @@ enum ThreadToRunIn {
Ad-Hoc thread - something running an event loop that isn't another thing
Controller thread - running an explicit event loop instance set as not a task runner or blocking worker
- UI thread - simpledisplay's event loop, which it will require remain live for the duration of the program (running two .eventLoops without a parent EventLoop instance will become illegal, throwing at runtime if it happens telling people to change their code
+ UI thread - simpledisplay's event loop, which it will require remain live for the duration of the program (running two .eventLoops without a parent EventLoop instance will become illegal, throwing at runtime if it happens telling people to change their code)
Windows HANDLES will always be listened on the thread itself that is requesting, UNLESS it is a worker/helper thread, in which case it goes to a coordinator thread. since it prolly can't rely on the parent per se this will have to be one created by arsd core init, UNLESS the parent is inside an explicit EventLoop structure.
@@ -2543,6 +2573,8 @@ class AsyncFile : AbstractFile {
Reads or writes a file in one call. It might internally yield, but is generally blocking if it returns values. The callback ones depend on the implementation.
Tip: prefer the callback ones. If settings where async is possible, it will do async, and if not, it will sync.
+
+ NOT IMPLEMENTED
+/
void writeFile(string filename, const(void)[] contents) {
@@ -2733,98 +2765,328 @@ class NamedPipeServer {
// can be on a specific thread or on any thread
}
+private version(Windows) extern(Windows) {
+ const(char)* inet_ntop(int, const void*, char*, socklen_t);
+}
+
/++
- Looking these up might be done asynchronously. The objects both represent an async request and its result, which is the actual address the operating system uses.
+ Some functions that return arrays allow you to provide your own buffer. These are indicated in the type system as `UserProvidedBuffer!Type`, and you get to decide what you want to happen if the buffer is too small via the [OnOutOfSpace] parameter.
+
+ These are usually optional, since an empty user provided buffer with the default policy of reallocate will also work fine for whatever needs to be returned, thanks to the garbage collector taking care of it for you.
- When you create an address, it holds a request. You can call `start` and `waitForCompletion` like with other async requests. The request may be run in a helper thread.
+ The API inside `UserProvidedBuffer` is all private to the arsd library implementation; your job is just to provide the buffer to it with [provideBuffer] or a constructor call and decide on your on-out-of-space policy.
- Unlike most the async objects though, its methods will implicitly call `waitForCompletion`.
+ $(TIP
+ To properly size a buffer, I suggest looking at what covers about 80% of cases. Trying to cover everything often leads to wasted buffer space, and if you use a reallocate policy it can cover the rest. You might be surprised how far just two elements can go!
+ )
- Note that The current implementation just blocks.
+ History:
+ Added August 4, 2023 (dub v11.0)
+/
-class SocketAddress /* : AsyncOperationRequest, AsyncOperationResponse */ {
- // maybe accept url?
- // unix:///home/me/thing
- // ip://0.0.0.0:4555
- // ipv6://[00:00:00:00:00:00]
-
- // address info
- abstract int domain();
- // FIXME: find all cases of this and make sure it is completed first
- abstract sockaddr* rawAddr();
- abstract socklen_t rawAddrLength();
+struct UserProvidedBuffer(T) {
+ private T[] buffer;
+ private int actualLength;
+ private OnOutOfSpace policy;
- /+
- // request interface
- abstract void start();
- abstract SocketAddress waitForCompletion();
- abstract bool isComplete();
+ /++
- // response interface
- abstract bool wasSuccessful();
+/
+ public this(scope T[] buffer, OnOutOfSpace policy = OnOutOfSpace.reallocate) {
+ this.buffer = buffer;
+ this.policy = policy;
+ }
+
+ package(arsd) bool append(T item) {
+ if(actualLength < buffer.length) {
+ buffer[actualLength++] = item;
+ return true;
+ } else final switch(policy) {
+ case OnOutOfSpace.discard:
+ return false;
+ case OnOutOfSpace.exception:
+ throw ArsdException!"Buffer out of space"(buffer.length, actualLength);
+ case OnOutOfSpace.reallocate:
+ buffer ~= item;
+ actualLength++;
+ return true;
+ }
+ }
+
+ package(arsd) T[] slice() return {
+ return buffer[0 .. actualLength];
+ }
}
-/+
-class BluetoothAddress : SocketAddress {
- // FIXME it is AF_BLUETOOTH
- // see: https://people.csail.mit.edu/albert/bluez-intro/x79.html
- // see: https://learn.microsoft.com/en-us/windows/win32/Bluetooth/bluetooth-programming-with-windows-sockets
+/// ditto
+UserProvidedBuffer!T provideBuffer(T)(scope T[] buffer, OnOutOfSpace policy = OnOutOfSpace.reallocate) {
+ return UserProvidedBuffer!T(buffer, policy);
+}
+
+/++
+ Possible policies for [UserProvidedBuffer]s that run out of space.
++/
+enum OnOutOfSpace {
+ reallocate, /// reallocate the buffer with the GC to make room
+ discard, /// discard all contents that do not fit in your provided buffer
+ exception, /// throw an exception if there is data that would not fit in your provided buffer
}
+
+
+/++
+ For functions that give you an unknown address, you can use this to hold it.
+
+ Can get:
+ ip4
+ ip6
+ unix
+ abstract_
+
+ name lookup for connect (stream or dgram)
+ request canonical name?
+
+ interface lookup for bind (stream or dgram)
+/
+struct SocketAddress {
+ import core.sys.posix.netdb;
-version(Posix) // FIXME: find the sockaddr_un definition for Windows too and add it in
-final class UnixAddress : SocketAddress {
- sockaddr_un address;
+ /++
+ Provides the set of addresses to listen on all supported protocols on the machine for the given interfaces. `localhost` only listens on the loopback interface, whereas `allInterfaces` will listen on loopback as well as the others on the system (meaning it may be publicly exposed to the internet).
- override int domain() {
- return AF_UNIX;
+ If you provide a buffer, I recommend using one of length two, so `SocketAddress[2]`, since this usually provides one address for ipv4 and one for ipv6.
+ +/
+ static SocketAddress[] localhost(ushort port, return UserProvidedBuffer!SocketAddress buffer = null) {
+ buffer.append(ip6("::1", port));
+ buffer.append(ip4("127.0.0.1", port));
+ return buffer.slice;
}
- override sockaddr* rawAddr() {
- return cast(sockaddr*) &address;
+ /// ditto
+ static SocketAddress[] allInterfaces(ushort port, return UserProvidedBuffer!SocketAddress buffer = null) {
+ char[16] str;
+ return allInterfaces(intToString(port, str[]), buffer);
}
- override socklen_t rawAddrLength() {
- return address.sizeof;
+
+ /// ditto
+ static SocketAddress[] allInterfaces(scope const char[] serviceOrPort, return UserProvidedBuffer!SocketAddress buffer = null) {
+ addrinfo hints;
+ hints.ai_flags = AI_PASSIVE;
+ hints.ai_socktype = SOCK_STREAM; // just to filter it down a little tbh
+ return get(null, serviceOrPort, &hints, buffer);
}
-}
-final class IpAddress : SocketAddress {
- sockaddr_in address;
+ /++
+ Returns a single address object for the given protocol and parameters.
- override int domain() {
- return AF_INET;
+ You probably should generally prefer [get], [localhost], or [allInterfaces] to have more flexible code.
+ +/
+ static SocketAddress ip4(scope const char[] address, ushort port, bool forListening = false) {
+ return getSingleAddress(AF_INET, AI_NUMERICHOST | (forListening ? AI_PASSIVE : 0), address, port);
}
- override sockaddr* rawAddr() {
- return cast(sockaddr*) &address;
+ /// ditto
+ static SocketAddress ip4(ushort port) {
+ return ip4(null, port, true);
}
- override socklen_t rawAddrLength() {
- return address.sizeof;
+
+ /// ditto
+ static SocketAddress ip6(scope const char[] address, ushort port, bool forListening = false) {
+ return getSingleAddress(AF_INET6, AI_NUMERICHOST | (forListening ? AI_PASSIVE : 0), address, port);
+ }
+
+ /// ditto
+ static SocketAddress ip6(ushort port) {
+ return ip6(null, port, true);
+ }
+
+ /// ditto
+ static SocketAddress unix(scope const char[] path) {
+ // FIXME
+ SocketAddress addr;
+ return addr;
+ }
+
+ /// ditto
+ static SocketAddress abstract_(scope const char[] path) {
+ char[190] buffer = void;
+ buffer[0] = 0;
+ buffer[1 .. path.length] = path[];
+ return unix(buffer[0 .. 1 + path.length]);
+ }
+
+ private static SocketAddress getSingleAddress(int family, int flags, scope const char[] address, ushort port) {
+ addrinfo hints;
+ hints.ai_family = family;
+ hints.ai_flags = flags;
+
+ char[16] portBuffer;
+ char[] portString = intToString(port, portBuffer[]);
+
+ SocketAddress[1] addr;
+ auto res = get(address, portString, &hints, provideBuffer(addr[]));
+ if(res.length == 0)
+ throw ArsdException!"bad address"(address.idup, port);
+ return res[0];
}
-}
-final class Ipv6Address : SocketAddress {
- sockaddr_in6 address;
+ /++
+ Calls `getaddrinfo` and returns the array of results. It will populate the data into the buffer you provide, if you provide one, otherwise it will allocate its own.
+ +/
+ static SocketAddress[] get(scope const char[] nodeName, scope const char[] serviceOrPort, addrinfo* hints = null, return UserProvidedBuffer!SocketAddress buffer = null, scope bool delegate(scope addrinfo* ai) filter = null) @trusted {
+ addrinfo* res;
+ CharzBuffer node = nodeName;
+ CharzBuffer service = serviceOrPort;
+ auto ret = getaddrinfo(nodeName is null ? null : node.ptr, serviceOrPort is null ? null : service.ptr, hints, &res);
+ if(ret == 0) {
+ auto current = res;
+ while(current) {
+ if(filter is null || filter(current)) {
+ SocketAddress addr;
+ addr.addrlen = cast(socklen_t) current.ai_addrlen;
+ switch(current.ai_family) {
+ case AF_INET:
+ addr.in4 = * cast(sockaddr_in*) current.ai_addr;
+ break;
+ case AF_INET6:
+ addr.in6 = * cast(sockaddr_in6*) current.ai_addr;
+ break;
+ case AF_UNIX:
+ addr.unix_address = * cast(sockaddr_un*) current.ai_addr;
+ break;
+ default:
+ // skip
+ }
+
+ if(!buffer.append(addr))
+ break;
+ }
+
+ current = current.ai_next;
+ }
+
+ freeaddrinfo(res);
+ } else {
+ version(Windows) {
+ throw new WindowsApiException("getaddrinfo", ret);
+ } else {
+ const char* error = gai_strerror(ret);
+ }
+ }
+
+ return buffer.slice;
+ }
+
+ /++
+ Returns a string representation of the address that identifies it in a custom format.
+
+ $(LIST
+ * Unix domain socket addresses are their path prefixed with "unix:", unless they are in the abstract namespace, in which case it is prefixed with "abstract:" and the zero is trimmed out. For example, "unix:/tmp/pipe".
- override int domain() {
- return AF_INET6;
+ * IPv4 addresses are written in dotted decimal followed by a colon and the port number. For example, "127.0.0.1:8080".
+
+ * IPv6 addresses are written in colon separated hex format, but enclosed in brackets, then followed by the colon and port number. For example, "[::1]:8080".
+ )
+ +/
+ string toString() const @trusted {
+ char[200] buffer;
+ switch(address.sa_family) {
+ case AF_INET:
+ auto writable = stringz(inet_ntop(address.sa_family, &in4.sin_addr, buffer.ptr, buffer.length));
+ auto it = writable.borrow;
+ buffer[it.length] = ':';
+ auto numbers = intToString(port, buffer[it.length + 1 .. $]);
+ return buffer[0 .. it.length + 1 + numbers.length].idup;
+ case AF_INET6:
+ buffer[0] = '[';
+ auto writable = stringz(inet_ntop(address.sa_family, &in6.sin6_addr, buffer.ptr + 1, buffer.length - 1));
+ auto it = writable.borrow;
+ buffer[it.length + 1] = ']';
+ buffer[it.length + 2] = ':';
+ auto numbers = intToString(port, buffer[it.length + 3 .. $]);
+ return buffer[0 .. it.length + 3 + numbers.length].idup;
+ case AF_UNIX:
+ // FIXME: it might be abstract in which case stringz is wrong!!!!!
+ auto writable = stringz(cast(char*) unix_address.sun_path.ptr).borrow;
+ if(writable.length == 0)
+ return "unix:";
+ string prefix = writable[0] == 0 ? "abstract:" : "unix:";
+ buffer[0 .. prefix.length] = prefix[];
+ buffer[prefix.length .. prefix.length + writable.length] = writable[writable[0] == 0 ? 1 : 0 .. $];
+ return buffer.idup;
+ case AF_UNSPEC:
+ return "<unspecified address>";
+ default:
+ return "<unsupported address>"; // FIXME
+ }
+ }
+
+ ushort port() const @trusted {
+ switch(address.sa_family) {
+ case AF_INET:
+ return ntohs(in4.sin_port);
+ case AF_INET6:
+ return ntohs(in6.sin6_port);
+ default:
+ return 0;
+ }
+ }
+
+ /+
+ @safe unittest {
+ SocketAddress[4] buffer;
+ foreach(addr; SocketAddress.get("arsdnet.net", "http", null, provideBuffer(buffer[])))
+ writeln(addr.toString());
}
+ +/
- override sockaddr* rawAddr() {
- return cast(sockaddr*) &address;
+ /+
+ unittest {
+ // writeln(SocketAddress.ip4(null, 4444, true));
+ // writeln(SocketAddress.ip4("400.3.2.1", 4444));
+ // writeln(SocketAddress.ip4("bar", 4444));
+ foreach(addr; localhost(4444))
+ writeln(addr.toString());
}
- override socklen_t rawAddrLength() {
- return address.sizeof;
+ +/
+
+ socklen_t addrlen = typeof(this).sizeof - socklen_t.sizeof; // the size of the union below
+
+ union {
+ sockaddr address;
+
+ sockaddr_storage storage;
+
+ sockaddr_in in4;
+ sockaddr_in6 in6;
+
+ sockaddr_un unix_address;
}
+
+ /+
+ this(string node, string serviceOrPort, int family = 0) {
+ // need to populate the approrpiate address and the length and make sure you set sa_family
+ }
+ +/
+
+ int domain() {
+ return address.sa_family;
+ }
+ sockaddr* rawAddr() return {
+ return &address;
+ }
+ socklen_t rawAddrLength() {
+ return addrlen;
+ }
+
+ // FIXME it is AF_BLUETOOTH
+ // see: https://people.csail.mit.edu/albert/bluez-intro/x79.html
+ // see: https://learn.microsoft.com/en-us/windows/win32/Bluetooth/bluetooth-programming-with-windows-sockets
}
-/++
- For functions that give you an unknown address, you can use this to hold it.
-+/
-struct SocketAddressBuffer {
- sockaddr address;
- socklen_t addrlen;
+private version(Windows) {
+ struct sockaddr_un {
+ ushort sun_family;
+ char[108] sun_path;
+ }
}
class AsyncSocket : AsyncFile {
@@ -2875,6 +3137,11 @@ class AsyncSocket : AsyncFile {
setCloExec(handle);
}
+ if(address.domain == AF_INET6) {
+ int opt = 1;
+ setsockopt(handle, IPPROTO_IPV6 /*SOL_IPV6*/, IPV6_V6ONLY, &opt, opt.sizeof);
+ }
+
// FIXME: chekc for broadcast
// FIXME: REUSEADDR ?
@@ -2945,8 +3212,8 @@ class AsyncSocket : AsyncFile {
/++
You can also construct your own request externally to control the memory more.
+/
- AsyncConnectRequest connect(SocketAddress address) {
- return new AsyncConnectRequest(this, address);
+ AsyncConnectRequest connect(SocketAddress address, ubyte[] bufferToSend = null) {
+ return new AsyncConnectRequest(this, address, bufferToSend);
}
/++
@@ -2975,25 +3242,29 @@ class AsyncSocket : AsyncFile {
/++
You can also construct your own request externally to control the memory more.
+/
- AsyncSendRequest sendTo(const(ubyte)[] buffer, SocketAddress address, int flags = 0) {
+ AsyncSendRequest sendTo(const(ubyte)[] buffer, SocketAddress* address, int flags = 0) {
return new AsyncSendRequest(this, buffer, address, flags);
}
/++
You can also construct your own request externally to control the memory more.
+/
- AsyncReceiveRequest receiveFrom(ubyte[] buffer, SocketAddressBuffer* address, int flags = 0) {
+ AsyncReceiveRequest receiveFrom(ubyte[] buffer, SocketAddress* address, int flags = 0) {
return new AsyncReceiveRequest(this, buffer, address, flags);
}
/++
+/
SocketAddress localAddress() {
- return null; // FIXME
+ SocketAddress addr;
+ getsockname(handle, &addr.address, &addr.addrlen);
+ return addr;
}
/++
+/
SocketAddress peerAddress() {
- return null; // FIXME
+ SocketAddress addr;
+ getpeername(handle, &addr.address, &addr.addrlen);
+ return addr;
}
// for unix sockets on unix only: send/receive fd, get peer creds
@@ -3009,9 +3280,17 @@ class AsyncSocket : AsyncFile {
}
/++
+ Initiates a connection request and optionally sends initial data as soon as possible.
+
+ Calls `ConnectEx` on Windows and emulates it on other systems.
+
+ The entire buffer is sent before the operation is considered complete.
+
+ NOT IMPLEMENTED / NOT STABLE
+/
class AsyncConnectRequest : AsyncOperationRequest {
- this(AsyncSocket socket, SocketAddress address) {
+ // FIXME: i should take a list of addresses and take the first one that succeeds, so a getaddrinfo can be sent straight in.
+ this(AsyncSocket socket, SocketAddress address, ubyte[] dataToWrite) {
}
@@ -3035,17 +3314,67 @@ class AsyncConnectResponse : AsyncOperationResponse {
}
+// FIXME: TransmitFile/sendfile support
+
/++
+ Calls `AcceptEx` on Windows and emulates it on other systems.
+
+ NOT IMPLEMENTED / NOT STABLE
+/
class AsyncAcceptRequest : AsyncOperationRequest {
- this(AsyncSocket socket) {
-
- }
+ AsyncSocket socket;
override void start() {}
override void cancel() {}
override bool isComplete() { return true; }
override AsyncConnectResponse waitForCompletion() { assert(0); }
+
+
+ struct LowLevelOperation {
+ AsyncSocket file;
+ ubyte[] buffer;
+ SocketAddress* address;
+
+ this(typeof(this.tupleof) args) {
+ this.tupleof = args;
+ }
+
+ version(Windows) {
+ auto opCall(OVERLAPPED* overlapped, LPOVERLAPPED_COMPLETION_ROUTINE ocr) {
+ WSABUF buf;
+ buf.len = cast(int) buffer.length;
+ buf.buf = cast(typeof(buf.buf)) buffer.ptr;
+
+ uint flags;
+
+ if(address is null)
+ return WSARecv(file.handle, &buf, 1, null, &flags, overlapped, ocr);
+ else {
+ return WSARecvFrom(file.handle, &buf, 1, null, &flags, &(address.address), &(address.addrlen), overlapped, ocr);
+ }
+ }
+ } else {
+ auto opCall() {
+ int flags;
+ if(address is null)
+ return core.sys.posix.sys.socket.recv(file.handle, buffer.ptr, buffer.length, flags);
+ else
+ return core.sys.posix.sys.socket.recvfrom(file.handle, buffer.ptr, buffer.length, flags, &(address.address), &(address.addrlen));
+ }
+ }
+
+ string errorString() {
+ return "Receive";
+ }
+ }
+ mixin OverlappedIoRequest!(AsyncAcceptResponse, LowLevelOperation);
+
+ this(AsyncSocket socket, ubyte[] buffer = null, SocketAddress* address = null) {
+ llo = LowLevelOperation(socket, buffer, address);
+ this.response = typeof(this.response).defaultConstructed;
+ }
+
+ // can also look up the local address
}
/++
+/
@@ -3053,6 +3382,10 @@ class AsyncAcceptResponse : AsyncOperationResponse {
AsyncSocket newSocket;
const SystemErrorCode errorCode;
+ this(SystemErrorCode errorCode, ubyte[] buffer) {
+ this.errorCode = errorCode;
+ }
+
this(AsyncSocket newSocket, SystemErrorCode errorCode) {
this.newSocket = newSocket;
this.errorCode = errorCode;
@@ -3070,7 +3403,7 @@ class AsyncReceiveRequest : AsyncOperationRequest {
AsyncSocket file;
ubyte[] buffer;
int flags;
- SocketAddressBuffer* address;
+ SocketAddress* address;
this(typeof(this.tupleof) args) {
this.tupleof = args;
@@ -3105,7 +3438,7 @@ class AsyncReceiveRequest : AsyncOperationRequest {
}
mixin OverlappedIoRequest!(AsyncReceiveResponse, LowLevelOperation);
- this(AsyncSocket socket, ubyte[] buffer, SocketAddressBuffer* address, int flags) {
+ this(AsyncSocket socket, ubyte[] buffer, SocketAddress* address, int flags) {
llo = LowLevelOperation(socket, buffer, flags, address);
this.response = typeof(this.response).defaultConstructed;
}
@@ -3134,7 +3467,7 @@ class AsyncSendRequest : AsyncOperationRequest {
AsyncSocket file;
const(ubyte)[] buffer;
int flags;
- SocketAddress address;
+ SocketAddress* address;
this(typeof(this.tupleof) args) {
this.tupleof = args;
@@ -3167,7 +3500,7 @@ class AsyncSendRequest : AsyncOperationRequest {
}
mixin OverlappedIoRequest!(AsyncSendResponse, LowLevelOperation);
- this(AsyncSocket socket, const(ubyte)[] buffer, SocketAddress address, int flags) {
+ this(AsyncSocket socket, const(ubyte)[] buffer, SocketAddress* address, int flags) {
llo = LowLevelOperation(socket, buffer, flags, address);
this.response = typeof(this.response).defaultConstructed;
}
@@ -3191,22 +3524,58 @@ class AsyncSendResponse : AsyncOperationResponse {
}
/++
- A socket bound and ready to accept connections.
+ A set of sockets bound and ready to accept connections on worker threads.
+
+ Depending on the specified address, it can be tcp, tcpv6, unix domain, or all of the above.
- Depending on the specified address, it can be tcp, tcpv6, or unix domain.
+ NOT IMPLEMENTED / NOT STABLE
+/
class StreamServer {
- this(SocketAddress listenTo) {
+ AsyncSocket[] sockets;
+ this(SocketAddress[] listenTo, int backlog = 8) {
+ foreach(listen; listenTo) {
+ auto socket = new AsyncSocket(listen, SOCK_STREAM);
+
+ // FIXME: allInterfaces for ipv6 also covers ipv4 so the bind can fail...
+ // so we have to permit it to fail w/ address in use if we know we already
+ // are listening to ipv6
+
+ // or there is a setsockopt ipv6 only thing i could set.
+
+ socket.bind(listen);
+ socket.listen(backlog);
+ sockets ~= socket;
+
+ // writeln(socket.localAddress.port);
+ }
+
+ // i have to start accepting on each thread for each socket...
}
// when a new connection arrives, it calls your callback
// can be on a specific thread or on any thread
+
+
+ void start() {
+ foreach(socket; sockets) {
+ auto request = socket.accept();
+ request.start();
+ }
+ }
+}
+
+/+
+unittest {
+ auto ss = new StreamServer(SocketAddress.localhost(0));
}
++/
/++
A socket bound and ready to use receiveFrom
Depending on the address, it can be udp or unix domain.
+
+ NOT IMPLEMENTED / NOT STABLE
+/
class DatagramListener {
// whenever a udp message arrives, it calls your callback
@@ -3407,6 +3776,7 @@ unittest {
// dispatches change event to either your thread or maybe the any task` queue.
/++
+ PARTIALLY IMPLEMENTED / NOT STABLE
+/
class DirectoryWatcher {
@@ -3965,11 +4335,158 @@ class AsyncReadResponse : AsyncOperationResponse {
runHelperFunction() - whomever it reports to is the parent
+/
-/+
-class Task : Fiber {
+class ScheduableTask : Fiber {
+ private void delegate() dg;
+
+ // linked list stuff
+ private static ScheduableTask taskRoot;
+ private ScheduableTask previous;
+ private ScheduableTask next;
+
+ // need the controlling thread to know how to wake it up if it receives a message
+ private Thread controllingThread;
+
+ // the api
+
+ this(void delegate() dg) {
+ assert(dg !is null);
+
+ this.dg = dg;
+ super(&taskRunner);
+
+ if(taskRoot !is null) {
+ this.next = taskRoot;
+ taskRoot.previous = this;
+ }
+ taskRoot = this;
+ }
+
+ /+
+ enum BehaviorOnCtrlC {
+ ignore,
+ cancel,
+ deliverMessage
+ }
+ +/
+ private bool cancelled;
+
+ public void cancel() {
+ this.cancelled = true;
+ // if this is running, we can throw immediately
+ // otherwise if we're calling from an appropriate thread, we can call it immediately
+ // otherwise we need to queue a wakeup to its own thread.
+ // tbh we should prolly just queue it every time
+ }
+
+ private void taskRunner() {
+ try {
+ dg();
+ } catch(TaskCancelledException tce) {
+ // this space intentionally left blank;
+ // the purpose of this exception is to just
+ // let the fiber's destructors run before we
+ // let it die.
+ } catch(Throwable t) {
+ if(taskUncaughtException is null) {
+ throw t;
+ } else {
+ taskUncaughtException(t);
+ }
+ } finally {
+ if(this is taskRoot) {
+ taskRoot = taskRoot.next;
+ if(taskRoot !is null)
+ taskRoot.previous = null;
+ } else {
+ assert(this.previous !is null);
+ assert(this.previous.next is this);
+ this.previous.next = this.next;
+ if(this.next !is null)
+ this.next.previous = this.previous;
+ }
+ }
+ }
}
+
+/++
+
+/
+void delegate(Throwable t) taskUncaughtException;
+
+/++
+ Gets an object that lets you control a schedulable task (which is a specialization of a fiber) and can be used in an `if` statement.
+
+ ---
+ if(auto controller = inSchedulableTask()) {
+ controller.yieldUntilReadable(...);
+ }
+ ---
+
+ History:
+ Added August 11, 2023 (dub v11.1)
++/
+SchedulableTaskController inSchedulableTask() {
+ import core.thread.fiber;
+
+ if(auto fiber = Fiber.getThis) {
+ return SchedulableTaskController(cast(ScheduableTask) fiber);
+ }
+
+ return SchedulableTaskController(null);
+}
+
+/// ditto
+struct SchedulableTaskController {
+ private this(ScheduableTask fiber) {
+ this.fiber = fiber;
+ }
+
+ private ScheduableTask fiber;
+
+ /++
+
+ +/
+ bool opCast(T : bool)() {
+ return fiber !is null;
+ }
+
+ /++
+
+ +/
+ version(Posix)
+ void yieldUntilReadable(NativeFileHandle handle) {
+ assert(fiber !is null);
+
+ auto cb = new CallbackHelper(() { fiber.call(); });
+
+ // FIXME: if the fd is already registered in this thread it can throw...
+ version(Windows)
+ auto rearmToken = getThisThreadEventLoop().addCallbackOnFdReadableOneShot(handle, cb);
+ else
+ auto rearmToken = getThisThreadEventLoop().addCallbackOnFdReadableOneShot(handle, cb);
+
+ // FIXME: this is only valid if the fiber is only ever going to run in this thread!
+ fiber.yield();
+
+ rearmToken.unregister();
+
+ // what if there are other messages, like a ctrl+c?
+ if(fiber.cancelled)
+ throw new TaskCancelledException();
+ }
+
+ version(Windows)
+ void yieldUntilSignaled(NativeFileHandle handle) {
+ // add it to the WaitForMultipleObjects thing w/ a cb
+ }
+}
+
+class TaskCancelledException : object.Exception {
+ this() {
+ super("Task cancelled");
+ }
+}
private class CoreWorkerThread : Thread {
this(EventLoopType type) {
@@ -3987,7 +4504,13 @@ private class CoreWorkerThread : Thread {
atomicOp!"-="(runningCount, 1);
}
- eventLoop.run(() => true);
+ eventLoop.run(() => cancelled);
+ }
+
+ private bool cancelled;
+
+ void cancel() {
+ cancelled = true;
}
EventLoopType type;
@@ -4030,6 +4553,14 @@ private class CoreWorkerThread : Thread {
started = true;
}
}
+
+ void cancelAll() {
+ foreach(runner; taskRunners)
+ runner.cancel();
+ foreach(runner; helperRunners)
+ runner.cancel();
+
+ }
}
}
@@ -4055,6 +4586,7 @@ private int numberOfCpus() {
Its destructor runs the event loop then waits to for the workers to finish to clean them up.
+/
+// FIXME: single instance?
struct ArsdCoreApplication {
private ICoreEventLoop impl;
@@ -4085,21 +4617,25 @@ struct ArsdCoreApplication {
@disable new();
~this() {
- run();
+ if(!alreadyRun)
+ run();
exitApplication();
waitForWorkersToExit(3000);
}
void exitApplication() {
-
+ CoreWorkerThread.cancelAll();
}
void waitForWorkersToExit(int timeoutMilliseconds) {
}
+ private bool alreadyRun;
+
void run() {
- impl.run(() => true);
+ impl.run(() => false);
+ alreadyRun = true;
}
}
@@ -4941,7 +5477,7 @@ class WritableStream {
/++
+/
- final void put(T)(T value, ByteOrder byteOrder = ByteOrder.irrelevant) {
+ final void put(T)(T value, ByteOrder byteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) {
static if(T.sizeof == 8)
ulong b;
else static if(T.sizeof == 4)
@@ -4953,7 +5489,7 @@ class WritableStream {
else static assert(0, "unimplemented type, try using just the basic types");
if(byteOrder == ByteOrder.irrelevant && T.sizeof > 1)
- throw new InvalidArgumentsException("byteOrder", "byte order must be specified for type " ~ T.stringof ~ " because it is bigger than one byte");
+ throw new InvalidArgumentsException("byteOrder", "byte order must be specified for type " ~ T.stringof ~ " because it is bigger than one byte", "WritableStream.put", file, line);
final switch(byteOrder) {
case ByteOrder.irrelevant:
@@ -4976,9 +5512,9 @@ class WritableStream {
}
/// ditto
- final void put(T : E[], E)(T value, ByteOrder elementByteOrder = ByteOrder.irrelevant) {
+ final void put(T : E[], E)(T value, ByteOrder elementByteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) {
foreach(item; value)
- put(item, elementByteOrder);
+ put(item, elementByteOrder, file, line);
}
/++
@@ -5051,9 +5587,9 @@ class ReadableStream {
ubyte[] data = stream.get!(ubyte[])(i);
---
+/
- final T get(T)(ByteOrder byteOrder = ByteOrder.irrelevant) {
+ final T get(T)(ByteOrder byteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) {
if(byteOrder == ByteOrder.irrelevant && T.sizeof > 1)
- throw new InvalidArgumentsException("byteOrder", "byte order must be specified for type " ~ T.stringof ~ " because it is bigger than one byte");
+ throw new InvalidArgumentsException("byteOrder", "byte order must be specified for type " ~ T.stringof ~ " because it is bigger than one byte", "ReadableStream.get", file, line);
// FIXME: what if it is a struct?
@@ -5093,9 +5629,9 @@ class ReadableStream {
}
/// ditto
- final T get(T : E[], E)(size_t length, ByteOrder elementByteOrder = ByteOrder.irrelevant) {
+ final T get(T : E[], E)(size_t length, ByteOrder elementByteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) {
if(elementByteOrder == ByteOrder.irrelevant && E.sizeof > 1)
- throw new InvalidArgumentsException("elementByteOrder", "byte order must be specified for type " ~ E.stringof ~ " because it is bigger than one byte");
+ throw new InvalidArgumentsException("elementByteOrder", "byte order must be specified for type " ~ E.stringof ~ " because it is bigger than one byte", "ReadableStream.get", file, line);
// if the stream is closed before getting the length or the terminator, should we send partial stuff
// or just throw?
@@ -5120,9 +5656,9 @@ class ReadableStream {
}
/// ditto
- final T get(T : E[], E)(scope bool delegate(E e) isTerminatingSentinel, ByteOrder elementByteOrder = ByteOrder.irrelevant) {
+ final T get(T : E[], E)(scope bool delegate(E e) isTerminatingSentinel, ByteOrder elementByteOrder = ByteOrder.irrelevant, string file = __FILE__, size_t line = __LINE__) {
if(byteOrder == ByteOrder.irrelevant && E.sizeof > 1)
- throw new InvalidArgumentsException("elementByteOrder", "byte order must be specified for type " ~ E.stringof ~ " because it is bigger than one byte");
+ throw new InvalidArgumentsException("elementByteOrder", "byte order must be specified for type " ~ E.stringof ~ " because it is bigger than one byte", "ReadableStream.get", file, line);
assert(0, "Not implemented");
}
@@ -5234,6 +5770,8 @@ unittest {
}
/++
+ UNSTABLE, NOT FULLY IMPLEMENTED. DO NOT USE YET.
+
You might use this like:
---
@@ -5256,11 +5794,12 @@ unittest {
proc.start();
---
- Please note that this does not currently and I have no plans as of this writing to add support for any kind of direct file descriptor passing. It always pipes them back to the parent for processing. If you don't want this, call the lower level functions yourself; the reason this class is here is to aid integration in the arsd.core event loop.
+ Please note that this does not currently and I have no plans as of this writing to add support for any kind of direct file descriptor passing. It always pipes them back to the parent for processing. If you don't want this, call the lower level functions yourself; the reason this class is here is to aid integration in the arsd.core event loop. Of course, I might change my mind on this.
- Of course, I might change my mind on this.
+ Bugs:
+ Not implemented at all on Windows yet.
+/
-class ExternalProcess {
+class ExternalProcess /*: AsyncOperationRequest*/ {
private static version(Posix) {
__gshared ExternalProcess[pid_t] activeChildren;
@@ -5292,12 +5831,14 @@ class ExternalProcess {
version(Posix) {
assert(0, "not implemented command line to posix args yet");
}
+ else throw new NotYetImplementedException();
}
this(string commandLine) {
version(Posix) {
assert(0, "not implemented command line to posix args yet");
}
+ else throw new NotYetImplementedException();
}
this(string[] args) {
@@ -5305,7 +5846,7 @@ class ExternalProcess {
this.program = FilePath(args[0]);
this.args = args;
}
-
+ else throw new NotYetImplementedException();
}
/++
@@ -5316,6 +5857,7 @@ class ExternalProcess {
this.program = program;
this.args = args;
}
+ else throw new NotYetImplementedException();
}
// you can modify these before calling start
@@ -5488,6 +6030,7 @@ class ExternalProcess {
// also need to listen to SIGCHLD to queue up the terminated callback. FIXME
stdoutUnregisterToken = getThisThreadEventLoop().addCallbackOnFdReadable(stdoutFd, new CallbackHelper(&stdoutReadable));
+ stderrUnregisterToken = getThisThreadEventLoop().addCallbackOnFdReadable(stderrFd, new CallbackHelper(&stderrReadable));
}
}
}
@@ -5501,6 +6044,7 @@ class ExternalProcess {
int stderrFd = -1;
ICoreEventLoop.UnregisterToken stdoutUnregisterToken;
+ ICoreEventLoop.UnregisterToken stderrUnregisterToken;
pid_t pid = -1;
@@ -5510,12 +6054,13 @@ class ExternalProcess {
string[] args;
void stdoutReadable() {
- ubyte[1024] buffer;
- auto ret = read(stdoutFd, buffer.ptr, buffer.length);
+ if(stdoutReadBuffer is null)
+ stdoutReadBuffer = new ubyte[](stdoutBufferSize);
+ auto ret = read(stdoutFd, stdoutReadBuffer.ptr, stdoutReadBuffer.length);
if(ret == -1)
throw new ErrnoApiException("read", errno);
if(onStdoutAvailable) {
- onStdoutAvailable(buffer[0 .. ret]);
+ onStdoutAvailable(stdoutReadBuffer[0 .. ret]);
}
if(ret == 0) {
@@ -5525,8 +6070,29 @@ class ExternalProcess {
stdoutFd = -1;
}
}
+
+ void stderrReadable() {
+ if(stderrReadBuffer is null)
+ stderrReadBuffer = new ubyte[](stderrBufferSize);
+ auto ret = read(stderrFd, stderrReadBuffer.ptr, stderrReadBuffer.length);
+ if(ret == -1)
+ throw new ErrnoApiException("read", errno);
+ if(onStderrAvailable) {
+ onStderrAvailable(stderrReadBuffer[0 .. ret]);
+ }
+
+ if(ret == 0) {
+ stderrUnregisterToken.unregister();
+
+ close(stderrFd);
+ stderrFd = -1;
+ }
+ }
}
+ private ubyte[] stdoutReadBuffer;
+ private ubyte[] stderrReadBuffer;
+
void waitForCompletion() {
getThisThreadEventLoop().run(&this.isComplete);
}
@@ -5594,22 +6160,6 @@ unittest {
static int received;
- static void tester() {
- received++;
- //writeln(cast(void*) Thread.getThis, " ", received);
- }
-
- foreach(ref thread; pool) {
- thread = new Thread(() {
- getThisThreadEventLoop().run(() {
- return shouldExit;
- });
- });
- thread.start();
- }
-
-
-
proc.writeToStdin("hello!");
proc.writeToStdin(null); // closes the pipe
@@ -5618,6 +6168,8 @@ unittest {
assert(proc.status == 0);
assert(c == 2);
+
+ // writeln("here");
}
+/
@@ -5651,42 +6203,53 @@ unittest {
=================
+/
+private void appendToBuffer(ref char[] buffer, ref int pos, scope const(char)[] what) {
+ auto required = pos + what.length;
+ if(buffer.length < required)
+ buffer.length = required;
+ buffer[pos .. pos + what.length] = what[];
+ pos += what.length;
+}
+
+private void appendToBuffer(ref char[] buffer, ref int pos, long what) {
+ if(buffer.length < pos + 16)
+ buffer.length = pos + 16;
+ auto sliced = intToString(what, buffer[pos .. $]);
+ pos += sliced.length;
+}
+
/++
- A `writeln` that actually works.
+ A `writeln` that actually works, at least for some basic types.
It works correctly on Windows, using the correct functions to write unicode to the console. even allocating a console if needed. If the output has been redirected to a file or pipe, it writes UTF-8.
- This always does text. See also WritableStream and WritableTextStream
+ This always does text. See also WritableStream and WritableTextStream when they are implemented.
+/
void writeln(T...)(T t) {
char[256] bufferBacking;
char[] buffer = bufferBacking[];
int pos;
+
foreach(arg; t) {
static if(is(typeof(arg) : const char[])) {
- buffer[pos .. pos + arg.length] = arg[];
- pos += arg.length;
+ appendToBuffer(buffer, pos, arg);
} else static if(is(typeof(arg) : stringz)) {
- auto b = arg.borrow;
- buffer[pos .. pos + b.length] = b[];
- pos += b.length;
+ appendToBuffer(buffer, pos, arg.borrow);
} else static if(is(typeof(arg) : long)) {
- auto sliced = intToString(arg, buffer[pos .. $]);
- pos += sliced.length;
+ appendToBuffer(buffer, pos, arg);
} else static if(is(typeof(arg.toString()) : const char[])) {
- auto s = arg.toString();
- buffer[pos .. pos + s.length] = s[];
- pos += s.length;
+ appendToBuffer(buffer, pos, arg.toString());
} else {
- auto s = "<unsupported type>";
- buffer[pos .. pos + s.length] = s[];
- pos += s.length;
- // static assert(0, "Unsupported type: " ~ T.stringof);
+ appendToBuffer(buffer, pos, "<" ~ typeof(arg).stringof ~ ">");
}
}
- buffer[pos++] = '\n';
+ appendToBuffer(buffer, pos, "\n");
+
+ actuallyWriteToStdout(buffer[0 .. pos]);
+}
+private void actuallyWriteToStdout(scope char[] buffer) @trusted {
version(Windows) {
import core.sys.windows.wincon;
@@ -5698,17 +6261,17 @@ void writeln(T...)(T t) {
if(GetFileType(hStdOut) == FILE_TYPE_CHAR) {
wchar[256] wbuffer;
- auto toWrite = makeWindowsString(buffer[0 .. pos], wbuffer, WindowsStringConversionFlags.convertNewLines);
+ auto toWrite = makeWindowsString(buffer, wbuffer, WindowsStringConversionFlags.convertNewLines);
DWORD written;
WriteConsoleW(hStdOut, toWrite.ptr, cast(DWORD) toWrite.length, &written, null);
} else {
DWORD written;
- WriteFile(hStdOut, buffer.ptr, pos, &written, null);
+ WriteFile(hStdOut, buffer.ptr, cast(DWORD) buffer.length, &written, null);
}
} else {
import unix = core.sys.posix.unistd;
- unix.write(1, buffer.ptr, pos);
+ unix.write(1, buffer.ptr, buffer.length);
}
}