HomeForumSourceResearchGuide
Sign in to contribute to source. how it works
Component ws.RequestHandlerEV by barry
expand copy to clipboardexpand
//simple request handler using a thread pool with blocking sockets

const char MSG_404[] = "HTTP 404: Content not found on this server"

const int MAX_CHUNK = 10240
const int BUF_SIZE_INCREMENT = 2048
const int RECV_SIZE = 2048

const int MAX_HEADER = 10240 //10KB
const int MAX_PAYLOAD = 1048576 //1MB

const int REQ_THREAD_POOL_SIZE = 32
const int NET_THREAD_POOL_SIZE = 1

const int MAX_INACTIVE = 2000
const int MONITOR_WAIT_TIME = 100

const int RECV_CONNECTION_CONTINUE = 1
const int RECV_CONNECTION_CLOSE = 2

data Command {
	const int GET = 1
	const int POST = 2
	const int PUT = 3
	const int DELETE = 4
	const int HEAD = 5
	int type
	
	const int HTTP_1_0 = 1
	const int HTTP_1_1 = 2
	int version
	
	char hostname[]
	
	char resource[]
	char ext[]
	
	bool persistent
	
	Header headers[]
}

data WorkItem {
	TCPSocket client
	TLS tlsClient
	Stream stream
	String landingPages[]
	ConfigData cfg
	bool killed
	int threadID
	NetWorkItem fromNetItem

	Command nextCommand
	byte payload[]

	WorkItem next
}

data NetWorkItem nocycle {
	TCPSocket client
	TLS tlsClient
	Stream stream
	ConfigData cfg
	int idleTime
	bool waiting
	bool killed
	int workID

	const byte S_WAIT_REQUEST = 0
	const byte S_WAIT_PAYLOAD = 1

	const byte S_TLS_ACCEPTING = 2
	const byte S_TLS_CLOSING = 3

	const byte S_CLOSED = 4
	byte state

	Command nextCommand
	byte buf[]
	int bufNextByte
	int payloadPending

	int lastActiveTime //timed eviction
	bool evict //immediate eviction
	int reqWorkCount
	int unsentBytes

	ReqWorkerThread curReqThread

	NetWorkItem next
	NetWorkItem prev
}

data ReqWorkerThread {
	Thread thread
	int threadID
	
	WorkItem work
	WorkItem lastWork
	int workCount
}

data NetThreadInfo {
	int index
}

data NetWorkerThread {
	Thread thread
	int threadID
	TCPMonitor monitor
	
	NetWorkItem work
	NetWorkItem workEnd
	int workCount
}

component provides RequestHandler(Destructor) requires io.Output out, io.FileSystem fileSystem, io.File, ws.DocStream, data.StringUtil stringUtil,
 data.IntUtil intUtil, ws.Web rh, time.Timer timer, net.http.Util util, time.Calendar cal, net.TCPMonitor, net.TLS {
	
	char web_root[] = "./"
	
	ReqWorkerThread reqThreadPool[] = new ReqWorkerThread[REQ_THREAD_POOL_SIZE]
	NetWorkerThread netThreadPool[] = new NetWorkerThread[NET_THREAD_POOL_SIZE]
	
	Mutex workerLock = new Mutex()
	
	char END_SEQ[] = "\r\n\r\n"
	char END_R[] = "\r"
	char END_N[] = "\n"

	bool debugConnections
	
	bool shutdown

	int nextWorkID = 0

	void logData(char fileName[], char value[])
		{
		File fd = new File(fileName, File.WRITE)
		fd.setPos(fd.getSize())
		fd.write(value)
		fd.write("\n")
		fd.close()
		}
	
	void copyBytes(byte dest[], byte src[], int destOffset, int srcOffset, int len)
		{
		int j = destOffset
		for (int i = 0; i < len; i++)
			{
			dest[j] = src[srcOffset+i]
			j ++
			}
		}
	
	Command parseCommand(char buf[], int top)
		{
		//we use a custom parser here, to detect "\r\n", ":", and spaces
		// - also detect keep-alive right here, and tag it as a boolean on the Command instance
		// - we also perform lowercase operations manually as we go
		// - and we extract the file extension for mime type conversion later

		Command cmd = new Command()
		Header headers[]
		
		int i = 0
		
		byte mode = 0
		int start = 0
		int dot = 0
		
		//first we parse the verb / resource line
		for (i = 0; i < top; i++)
			{
			if (mode == 0)
				{
				if (buf[i] == " ")
					{
					//this is the end of the HTTP verb
					char verb[] = new char[i-start]
					copyBytes(verb, buf, 0, start, i-start)
					
					//out.println("verb '$verb'")
					
					if (verb == "get") cmd.type = Command.GET
					else if (verb == "post") cmd.type = Command.POST
					
					mode = 1
					
					start = i + 1
					}
					else if (buf[i] >= "A" && buf[i] <= "Z")
					{
					//lowercase this
					buf[i] += 32
					}
				}
				else if (mode == 1)
				{
				if (buf[i] == " ")
					{
					//this is the end of the HTTP resource
					char resource[] = new char[i-start]
					copyBytes(resource, buf, 0, start, i-start)
					
					//capture the file extension, if any
					if (dot != 0)
						{
						char ext[] = new char[i-(dot+1)]
						copyBytes(ext, buf, 0, dot+1, i-(dot+1))
						
						cmd.ext = ext
						}
					
					//out.println("rsrc '$resource'")
					
					cmd.resource = resource
					
					mode = 2
					
					start = i + 1
					}
					else if (buf[i] == ".")
					{
					dot = i
					}
				}
				else if (mode == 2)
				{
				if (buf[i] == "\r")
					{
					mode = 3
					}
					else if (buf[i] >= "A" && buf[i] <= "Z")
					{
					//lowercase this
					buf[i] += 32
					}
				}
				else if (mode == 3)
				{
				if (buf[i] == "\n")
					{
					//this is the end of the HTTP version
					char version[] = new char[i-start-1]
					copyBytes(version, buf, 0, start, i-start-1)
					
					//out.println("vers '$version'")
					
					if (version == "http/1.0") { cmd.version = Command.HTTP_1_0 }
					else if (version == "http/1.1") { cmd.version = Command.HTTP_1_1 }
					
					mode = 3
					
					start = i + 1
					
					break
					}
					else
					{
					throw new Exception("malformed HTTP request, expected newline in verb")
					}
				}
			}
		
		if (cmd.resource == null)
			throw new Exception("malformed HTTP request, missing resource in verb")
		
		if (cmd.version == 0)
			throw new Exception("malformed HTTP request, missing version in verb")
		
		//next we have headers
		char hkey[]
		char hval[]
		mode = 0
		for (; i < top; i++)
			{
			if (mode == 0)
				{
				if (buf[i] == ":")
					{
					int end = i
					
					while (buf[end] == " ") end --
					
					if (end <= start) throw new Exception("malformed HTTP request, missing key in header line")
					
					hkey = new char[end-start]
					copyBytes(hkey, buf, 0, start, end-start)
					
					mode = 1
					
					start = i + 1
					}
					else if (buf[i] >= "A" && buf[i] <= "Z")
					{
					//lowercase this
					buf[i] += 32
					}
				}
				else if (mode == 1)
				{
				if (buf[i] == "\r")
					{
					mode = 2
					}
				}
				else if (mode == 2)
				{
				if (buf[i] == "\n")
					{
					while (buf[start] == " ") start ++
					
					if (i <= start-1) throw new Exception("malformed HTTP request, missing value in header line")
					
					hval = new char[i-start-1]
					copyBytes(hval, buf, 0, start, i-start-1)
					
					headers = new Header[](headers, new Header(hkey, hval))
					
					//out.println("hdr '$hkey' '$hval'")
					
					//special header checks
					if (hkey == "host") cmd.hostname = hval
					if (hkey == "connection") cmd.persistent = hval.ifind("keep-alive") != StringUtil.NOT_FOUND
					
					mode = 0
					
					start = i + 1
					}
					else
					{
					throw new Exception("malformed HTTP request, expected newline in header")
					}
				}
			}
		
		cmd.headers = headers
		
		return cmd
		}
	
	char[] getHeaderValue(Command cmd, char key[])
		{
		key = stringUtil.lowercase(key)
		for (int i = 0; i < cmd.headers.arrayLength; i++)
			{
			if (cmd.headers[i].key == key)
				return cmd.headers[i].value
			}
		return null
		}
	
	bool serveFunction_GET(DocStream ds, char path[], char command[]) {
		return rh.get(command, ds)
	}
	
	bool serveFunction_POST(DocStream ds, char path[], char command[], char ctype[], byte payload[]) {
		return rh.post(command, ctype, payload, ds)
	}
	
	char[] getMIMEType(char ext[], KeyVal mimeTypes[])
		{
		if (ext == null)
			return "application/octet-stream"
		
		for (int i = 0; i < mimeTypes.arrayLength; i ++)
			{
			if (mimeTypes[i].key == ext)
				{
				return mimeTypes[i].val
				}
			}
		
		return "application/octet-stream"
		}
	
	char[] getResourcePrepend(char host[], KeyVal subdomains[])
		{
		for (int i = 0; i < subdomains.arrayLength; i ++)
			{
			if (subdomains[i].key == host) return subdomains[i].val
			}
		
		return null
		}
	
	bool serveFile(char command[], char ext[], Stream s, ConfigData cfg, bool keepAlive, bool includeContent, opt char hdrLine[])
		{
		char path[] = new char[](web_root, command)

		if (hdrLine == null) hdrLine = "HTTP/1.1 200 OK"
		
		if (fileSystem.exists(path))
			{
			if (fileSystem.getInfo(path).type == FileInfo.TYPE_DIR)
				{
				//try common landing pages
				bool found = false
				for (int i = 0; i < cfg.landingPages.arrayLength; i++)
					{
					char test[] = new char[](path, cfg.landingPages[i].string)
					
					if (fileSystem.exists(test))
						{
						if (cfg.landingPages[i].string.endsWith(".html")) ext = "html"
						
						path = test
						found = true
						break
						}
					}
				
				if (!found)
					{
					return false
					}
				}
			
			
			File fd = new File(path, File.READ)
			s.send("$hdrLine\r\n")
			s.send("Server: Dana Web Engine\r\n")
			if (!keepAlive)
				s.send("Connection: close\r\n")
				else
				s.send("Connection: keep-alive\r\n")
			s.send("Content-length: $(intUtil.makeString(fd.getSize()))\r\n")
			s.send("Content-Type: $(getMIMEType(ext, cfg.mimeTypes))\r\n")
			s.send("Date: $(util.getDateString(cal.getTime()))\r\n")
			s.send("Last-Modified: $(util.getDateString(fileSystem.getInfo(path).modified))\r\n")
			s.send("\r\n")
			
			if (includeContent)
				{
				while (!fd.eof())
					{
					byte buf[] = fd.read(MAX_CHUNK)
					s.send(buf)
					}
				}
			
			fd.close()
			
			return true
			}
		
		return false
		}
	
	bool staticFile(char command[], String staticServe[])
		{
		for (int i = 0; i < staticServe.arrayLength; i++)
			{
			if ((command.arrayLength >= staticServe[i].string.arrayLength) && 
				(stringUtil.subString(command, 0, staticServe[i].string.arrayLength) == staticServe[i].string))
				{
				return true
				}
			}
		
		return false
		}
	
	void processRequest(WorkItem work)
		{
		Stream stream = work.stream
		
		String landingPages[] = work.landingPages
		String staticServe[] = work.cfg.staticServe

		NetworkEndpoint endpoint = work.client.getRemoteEndpoint()
		
		bool keepAlive = true

		//parse headers, extract command
		//call function associated with command, return result (as HTTP response)
		Command cmd = work.nextCommand

		if (debugConnections) out.println("[NWID $(work.fromNetItem.workID)] process req $(cmd.resource)")
		
		if (cmd == null)
			{
			//malformed / interrupted / idle-timeout request (this is normal behaviour)
			return
			}
		
		keepAlive = cmd.persistent
		
		char subDomainPath[] = getResourcePrepend(cmd.hostname, work.cfg.subdomains)
		char handler[]
		bool success = false
		
		if (cmd.type == Command.GET)
			{
			if (staticFile(cmd.resource, staticServe))
				{
				success = serveFile(cmd.resource, cmd.ext, stream, work.cfg, keepAlive, true)
				}
				else 
				{
				if (subDomainPath != null) cmd.resource = "/$subDomainPath$(cmd.resource)"
				
				DocStream ds = new DocStream(stream, new Header[](new Header("x-client-ip", endpoint.address), cmd.headers))
				success = serveFunction_GET(ds, handler, cmd.resource)
				}
			}
			else if (cmd.type == Command.HEAD)
			{
			if (staticFile(cmd.resource, staticServe))
				{
				success = serveFile(cmd.resource, cmd.ext, stream, work.cfg, keepAlive, false)
				}
				else 
				{
				//TODO. -- what's the best option here? a Web.head() method? or just some auto-response?
				}
			}
			else if (cmd.type == Command.POST)
			{
			if (subDomainPath != null) cmd.resource = "/$subDomainPath$(cmd.resource)"
			
			DocStream ds = new DocStream(stream, new Header[](new Header("x-client-ip", endpoint.address), cmd.headers))
			
			//read the payload first
			char ctype[] = getHeaderValue(cmd, "content-type")
			byte payload[] = work.payload
			success = serveFunction_POST(ds, handler, cmd.resource, ctype, payload)
			}
			else
			{
			stream.send("HTTP/1.1 501 Not Implemented\r\n")
			stream.send("\r\n")
			success = true
			}
		
		if (!success)
			{
			if (fileSystem.exists("404.html"))
				{
				serveFile("404.html", "html", stream, work.cfg, keepAlive, cmd.type != Command.HEAD, "HTTP/1.1 404 Resource Not Found")
				}
				else
				{
				stream.send("HTTP/1.1 404 Resource Not Found\r\n")
				stream.send("Server: Dana Web Engine\r\n")
				if (keepAlive)
					stream.send("Connection: keep-alive\r\n")
					else
					stream.send("Connection: close\r\n")
				stream.send("Content-length: $(MSG_404.arrayLength)\r\n")
				stream.send("Content-type: text/plain \r\n")
				stream.send("Date: $(util.getDateString(cal.getTime()))\r\n")
				stream.send("\r\n")
				stream.send(MSG_404)
				}
			}
		
		if (debugConnections) out.println("[NWID $(work.fromNetItem.workID)] process req $(cmd.resource) OK")

		mutex(work.fromNetItem)
			{
			work.fromNetItem.lastActiveTime = cal.getMS()
			work.fromNetItem.reqWorkCount --
			}
		}
	
	RequestHandler:RequestHandler()
		{
		//init tasks for web app
		rh.setup()

		//start thread pool
		
		for (int i = 0; i < NET_THREAD_POOL_SIZE; i++)
			{
			netThreadPool[i] = new NetWorkerThread()
			netThreadPool[i].monitor = new TCPMonitor()
			netThreadPool[i].thread = asynch::netWorkerThread(netThreadPool[i])
			netThreadPool[i].threadID = i
			}
		
		for (int i = 0; i < REQ_THREAD_POOL_SIZE; i++)
			{
			reqThreadPool[i] = new ReqWorkerThread()
			reqThreadPool[i].thread = asynch::reqWorkerThread(reqThreadPool[i])
			reqThreadPool[i].threadID = i
			}
		
		//asynch::timerThread()
		}
	
	void reqWorkerThread(ReqWorkerThread state)
		{
		while (true)
			{
			this.thread.wait()
			
			if (state.workCount != 0)
				{
				WorkItem w = state.work
				
				mutex(state)
					{
					w = state.work
					state.work = w.next
					if (state.work == null) state.lastWork = null
					w.next = null
					}
				
				processRequest(w)

				if (w.nextCommand != null && !w.nextCommand.persistent)
					{
					w.fromNetItem.evict = true
					//TODO: we MUST pass state into this first parameter, somehow!!!
					closeConnection(null, w.fromNetItem, false)
					}

				mutex(state)
					{
					state.workCount --
					w.fromNetItem = null
					w = null
					}
				}
				else if (shutdown)
				{
				return
				}
			}
		}
	
	eventsink NetEvents(EventData ed)
		{
		//only one event is possible here (sendWait)
		TCPSocket client = ed.source
		NetThreadInfo threadInfo = ed.details

		mutex(threadInfo)
			{
			if (client.connected())
				netThreadPool[threadInfo.index].monitor.armSendNotify(client)
			}
		}
	
	eventsink NetEventsTLS(EventData ed)
		{
		//only one event is possible here (sendWait)
		TLS tls = ed.source
		TCPSocket client = tls.getSocket()
		NetThreadInfo threadInfo = ed.details
		netThreadPool[threadInfo.index].monitor.armSendNotify(client)
		}
	
	byte[] expandBuffer(byte buf[], int newMinSize, int incrementSize)
		{
		//out.println("expand buffer from $(buf.arrayLength) to at least $(newMinSize) via $(incrementSize)")
		int newActualSize = 0
		
		if (incrementSize != 0)
			{
			newActualSize = newMinSize
			if ((newMinSize % incrementSize) != 0) newActualSize += (incrementSize - (newMinSize % incrementSize))
			}
			else
			{
			newActualSize = newMinSize
			}
		
		byte newBuf[] = new byte[newActualSize]
		newBuf =[] buf

		//out.println(" -- buffer now $newActualSize bytes")

		return newBuf
		}
	
	int receiveBytes(NetWorkItem item)
		{
		//receive a chunk of data; append it to item's buf;
		//check if that buf now contains a complete request header (and if it has any bytes after that request, which might form the request body);
		//if it does, fire the processRequest() function to handle that request (BUT that needs to be in a different thread, since it'll need sendBuffer!);
		//if the receive call returns "null", check for close status and consider closing the socket & removing it

		bool requestReady = false
		Command cmd = null

		byte buf[] = item.stream.recv(RECV_SIZE)

		if (buf != null)
			{
			//expand our item's buffer if needed, otherwise append the data to it
			if ((item.bufNextByte + buf.arrayLength) > item.buf.arrayLength)
				{
				//if we're in NetWorkItem.S_WAIT_REQUEST mode, check we're not going to go over some header size limit
				if ((item.state == NetWorkItem.S_WAIT_REQUEST) && (item.bufNextByte + buf.arrayLength > MAX_HEADER))
					return RECV_CONNECTION_CLOSE
				
				item.buf = expandBuffer(item.buf, item.bufNextByte + buf.arrayLength, BUF_SIZE_INCREMENT)
				}
			
			copyBytes(item.buf, buf, item.bufNextByte, 0, buf.arrayLength)

			if (item.state == NetWorkItem.S_WAIT_REQUEST)
				{
				//scan from the current bufNextByte to understand if the remainder of the request is now available in the buffer
				// - if it is, finish processing the request into a Command (and if there are remaining bytes after that point, shift those bytes to the start of buf and adjust bufNextByte accordingly)
				int searchFrom = item.bufNextByte
				item.bufNextByte += buf.arrayLength

				if (searchFrom < 4)
					searchFrom = 0
					else
					searchFrom = searchFrom - 4

				//out.println("[NWID $(item.workID)] buf now: '$(item.buf)'")
				
				int pos = 0
				if ((pos = stringUtil.find(item.buf, "\r\n\r\n", searchFrom)) != StringUtil.NOT_FOUND)
					{
					cmd = parseCommand(item.buf, pos + 4)

					if (item.bufNextByte > (pos + 5))
						{
						//copy-down
						byte tmp[] = dana.sub(item.buf, pos + 4, item.bufNextByte)
						item.buf =[] tmp

						item.bufNextByte = item.bufNextByte - (pos + 4)
						}
						else
						{
						item.bufNextByte = 0
						}

					//check for a payload
					char clength[] = null
					if ((clength = getHeaderValue(cmd, "content-length")) != null)
						{
						item.payloadPending = intUtil.intFromString(clength)

						//check if this payload size is <= some max payload value
						if (item.payloadPending > MAX_PAYLOAD)
							{
							return RECV_CONNECTION_CLOSE
							}

						if (item.bufNextByte < item.payloadPending)
							{
							if (item.payloadPending > item.buf.arrayLength) expandBuffer(item.buf, item.payloadPending, BUF_SIZE_INCREMENT)
							item.state = NetWorkItem.S_WAIT_PAYLOAD
							}
							else
							{
							requestReady = true
							}
						}
						else
						{
						item.payloadPending = 0
						requestReady = true
						}
					}
				}
				else if (item.state == NetWorkItem.S_WAIT_PAYLOAD)
				{
				item.bufNextByte += buf.arrayLength

				if (item.bufNextByte >= item.payloadPending)
					{
					//ready to process
					cmd = item.nextCommand
					requestReady = true
					}
				}
			}
			else
			{
			//connection-close?
			if (!item.client.connected())
				return RECV_CONNECTION_CLOSE
			}

		if (requestReady)
			{
			ReqWorkerThread worker

			//here we make sure that all requests from the same network socket are queued up on the same request worker thread, to make sure they're processed sequentially
			mutex(item)
				{
				if (item.curReqThread == null)
					worker = getReqWorker()
					else
					worker = item.curReqThread

				WorkItem nwi = new WorkItem(item.client, item.tlsClient, item.stream, item.cfg.landingPages, item.cfg)
				nwi.nextCommand = cmd
				if (item.payloadPending != 0) nwi.payload = dana.sub(item.buf, 0, item.payloadPending)
				nwi.fromNetItem = item
				
				mutex(worker)
					{
					nwi.threadID = worker.threadID

					if (worker.lastWork == null)
						worker.work = nwi
						else
						worker.lastWork.next = nwi
					
					worker.lastWork = nwi
					
					worker.workCount ++
					}
				
				item.curReqThread = worker

				item.reqWorkCount ++
				}
				
			worker.thread.signal()
			}
		
		return RECV_CONNECTION_CONTINUE
		}
	
	bool closeConnection(NetWorkerThread state, NetWorkItem item, bool immediate)
		{
		if (item.tlsClient != null)
			{
			item.state = NetWorkItem.S_TLS_CLOSING
			int status = item.tlsClient.close()
			if (!immediate)
				{
				if (status == TLS.OK)
					{
					item.client.disconnect()
					item.state = NetWorkItem.S_CLOSED
					//out.println("SSL_CLOSED")
					return true
					}
					else if (status == TLS.WAIT_WRITE)
					{
					state.monitor.armSendNotify(item.client)
					//out.println("SSL_CLOSE_WAIT_WRITE")
					}
				}
				else
				{
				item.client.disconnect()
				item.state = NetWorkItem.S_CLOSED
				return true
				}
			return false
			}
			else
			{
			item.client.disconnect()
			item.state = NetWorkItem.S_CLOSED
			}
		
		return true
		}
	
	void closeItem(NetWorkerThread state, NetWorkItem item)
		{
		//remove the item from our work list
		mutex(state)
			{
			//out.println("rm-socket $(item.client.getID())")
			state.monitor.remSocket(item.client)
			item.curReqThread = null

			//if it's already at the end, do nothing
			if (item === state.workEnd)
				{
				if (state.work === state.workEnd)
					{
					state.work = null
					state.workEnd = null
					}
					else
					{
					state.workEnd = state.workEnd.prev
					state.workEnd.next = null
					}
				}
				else if (item === state.work)
				{
				//if it's at the start, some special casing
				state.work.next.prev = null
				state.work = state.work.next
				}
				else
				{
				//it's something in the middle
				item.next.prev = item.prev
				item.prev.next = item.next
				}
			
			item.next = null
			item.prev = null
			}
		}
	
	void moveWorkToEnd(NetWorkerThread state, NetWorkItem item)
		{
		mutex(state)
			{
			//if it's already at the end, do nothing
			if (item === state.workEnd)
				{
				return
				}
			
			//if it's at the start, some special casing
			if (item === state.work)
				{
				state.work.next.prev = null
				state.work = state.work.next
				}
				else
				{
				//it's something in the middle
				item.next.prev = item.prev
				item.prev.next = item.next
				}
			
			//put it at the end
			item.next = null
			item.prev = null
			
			state.workEnd.next = item
			item.prev = state.workEnd

			state.workEnd = item
			}
		}
	
	void pruneConnections(NetWorkerThread state, int now)
		{
		//look at the queue of work items, and see if any of them have been inactive for too long and should be culled/closed
		mutex (state)
			{
			//keep checking the first item in the list, and remove and close that connection if the last-active time is too old
			// -- we need awareness here that the current time might be before the lastActiveTime, in the case of clock-wraps; in this case we need to just overwrite the lastActiveTime with the new version and return to it next time (?)
			while (state.work != null)
				{
				if (state.work.lastActiveTime > now)
					{
					state.work.lastActiveTime = now
					return
					}
				
				//here we check if a net item has no actual work pending (meaning it's received a request and is busy sending the response, so we don't expect any further inbound socket activity until the response has been fully sent)
				if (state.work.reqWorkCount == 0 && (state.work.evict || ((now - state.work.lastActiveTime) > MAX_INACTIVE)))
					{
					if (debugConnections) out.println(" -- [NWID $(state.work.workID)] connection-evict")

					//out.println("EVICT $(state.work.workID)")

					if (state.work.state == NetWorkItem.S_CLOSED || closeConnection(state, state.work, false))
						{
						closeItem(state, state.work)
						}
						else
						{
						return
						}
					}
					else
					{
					return
					}
				}
			}
		}

	void netWorkerThread(NetWorkerThread state)
		{
		while (true)
			{
			MonitorEvent events[] = state.monitor.wait(MONITOR_WAIT_TIME)

			//out.println("socket/time event")

			int now = cal.getMS()

			for (int i = 0; i < events.arrayLength; i++)
				{
				NetWorkItem item = events[i].userData

				bool closed = false

				//update the last-active time on this item
				item.lastActiveTime = now

				//move the work item to the end of our list, so we end up with a sorted list of least-recently-active at the start
				moveWorkToEnd(state, item)

				if (events[i].recvReady && item.state != NetWorkItem.S_CLOSED)
					{
					if (item.state == NetWorkItem.S_TLS_ACCEPTING)
						{
						int status = item.tlsClient.accept(item.client)

						if (status == TLS.OK)
							{
							item.state = NetWorkItem.S_WAIT_REQUEST
							//out.println(" (SSL ACCEPT_OK-l)")
							}
							else if (status == TLS.WAIT_WRITE)
							{
							state.monitor.armSendNotify(item.client)
							//out.println(" (SSL ACCEPT_WW-l)")
							}
						}
						else if (item.state == NetWorkItem.S_TLS_CLOSING)
						{
						int status = item.tlsClient.close()

						if (status == TLS.OK)
							{
							item.state = NetWorkItem.S_WAIT_REQUEST
							item.client.disconnect()
							closeItem(state, events[i].userData)
							//out.println(" (SSL CLOSE_OK-l)")
							}
							else if (status == TLS.WAIT_WRITE)
							{
							state.monitor.armSendNotify(item.client)
							//out.println(" (SSL CLOSE_WW-l)")
							}
						}
						else if (!item.evict)
						{
						if (debugConnections) out.println("[NWID $(item.workID)] client socket receive-ready")
						int status = receiveBytes(events[i].userData)

						if (status == RECV_CONNECTION_CLOSE)
							{
							if (debugConnections) out.println(" -- [NWID $(item.workID)] recv requests connection-close")

							if (closeConnection(state, events[i].userData, false))
								{
								closeItem(state, events[i].userData)
								closed = true
								}
							}
						}
					}
				
				if (events[i].sendReady)
					{
					if (item.state == NetWorkItem.S_TLS_ACCEPTING)
						{
						int status = item.tlsClient.accept(item.client)

						if (status == TLS.OK)
							{
							item.state = NetWorkItem.S_WAIT_REQUEST
							//out.println(" (SSL ACCEPT_OK-lw)")
							}
							else if (status == TLS.WAIT_WRITE)
							{
							state.monitor.armSendNotify(item.client)
							//out.println(" (SSL ACCEPT_WW-lw)")
							}
						}
						else if (item.state == NetWorkItem.S_TLS_CLOSING)
						{
						int status = item.tlsClient.close()

						if (status == TLS.OK)
							{
							item.state = NetWorkItem.S_WAIT_REQUEST
							item.client.disconnect()
							closeItem(state, events[i].userData)
							//out.println(" (SSL CLOSE_OK-lw)")
							}
							else if (status == TLS.WAIT_WRITE)
							{
							state.monitor.armSendNotify(item.client)
							//out.println(" (SSL CLOSE_WW-lw)")
							}
						}
						else
						{
						if (debugConnections) out.println(" -- [NWID $(item.workID)] client socket send-ready")
						if (item.tlsClient != null)
							item.tlsClient.sendBuffer()
							else
							events[i].socket.sendBuffer()
						}
					}
				
				if (events[i].close && !closed)
					{
					//this is a TCP-level socket closed event, meaning the client has killed the TCP connection
					// - there seems to be a case here where TLS will permanently say "want read" on shutdown, so we pass a flag to closeConnection() to say "close TCP immediately and stop" in the TLS case
					if (debugConnections) out.println(" -- [NWID $(item.workID)] connection-close event")
					if (closeConnection(state, events[i].userData, true))
						{
						closeItem(state, events[i].userData)
						}
					}
				}
			
			pruneConnections(state, now)
			
			if (state.workCount == 0 && shutdown)
				{
				return
				}
			}
		}
	
	ReqWorkerThread getReqWorker()
		{
		int smallestIndex = 0
		
		mutex(workerLock)
			{
			for (int i = 1; i < reqThreadPool.arrayLength; i++)
				{
				if (reqThreadPool[i].workCount < reqThreadPool[smallestIndex].workCount)
					smallestIndex = i
				}
			}
		
		return reqThreadPool[smallestIndex]
		}
	
	void RequestHandler:processStream(store TCPSocket s, store TLSContext tlsCtx, store ConfigData cfg)
		{
		TLS tls

		Stream stream = s

		//out.println("NEW connection")

		if (tlsCtx != null)
			{
			tls = new TLS(tlsCtx)
			stream = tls
			//out.println(" (SSL)")
			}
		
		NetWorkItem nwi = new NetWorkItem(s, tls, stream, cfg)
		nwi.buf = new byte[BUF_SIZE_INCREMENT]
		nwi.workID = nextWorkID
		nwi.lastActiveTime = cal.getMS()

		nextWorkID ++

		//we currently only ever use the first net thread...
		int workerIndex = 0

		NetWorkerThread thread = netThreadPool[workerIndex]

		mutex(thread)
			{
			s.setNonBlocking(new NetThreadInfo(workerIndex))

			//if "tls" is true we need to call tls.accept(), record the TLS state machine within nwi, and only sinkevent on tls
			if (tls == null)
				{
				sinkevent NetEvents(s)
				}
				else
				{
				tls.setNonBlocking(new NetThreadInfo(workerIndex))
				nwi.state = NetWorkItem.S_TLS_ACCEPTING
				sinkevent NetEventsTLS(tls)
				}

			if (thread.work == null)
				{
				thread.work = nwi
				}
				else
				{
				thread.workEnd.next = nwi
				nwi.prev = thread.workEnd
				}
			
			thread.workEnd = nwi

			thread.monitor.addSocket(s, nwi)

			if (tls != null)
				{
				int status = tls.accept(s)
				if (status == TLS.OK)
					{
					nwi.state = NetWorkItem.S_WAIT_REQUEST
					//out.println(" (SSL ACCEPT_OK)")
					}
					else if (status == TLS.WAIT_WRITE)
					{
					thread.monitor.armSendNotify(s)
					//out.println(" (SSL ACCEPT_WW)")
					}
				}

			if (debugConnections) out.println("client socket added as [NWID $(nwi.workID)]")
			}
		}
	
	void Destructor:destroy()
		{
		shutdown = true

		for (int i = 0; i < reqThreadPool.arrayLength; i++)
			{
			reqThreadPool[i].thread.signal()
			reqThreadPool[i].thread.join()
			}
		
		for (int i = 0; i < netThreadPool.arrayLength; i++)
			{
			netThreadPool[i].thread.join()

			while (netThreadPool[i].work != null)
				{
				closeItem(netThreadPool[i], netThreadPool[i].work)
				}
			}
		
		reqThreadPool = null
		netThreadPool = null
		}
	
	}
Revision history
To propose a new revision to this entity, use dana source put -uc your/new/version.dn -n ws.RequestHandlerEV -m "reason for update" -u yourUsername
Version 3 (this version) by barry
Notes for this version: Adds x-client-ip header to DocStream's header list, to report the client's IP address as observed by the web framework
Version 2 by barry
Version 1 by barry