HomeForumSourceResearchGuide
Sign in to contribute to source. how it works
Component ws.RequestHandler by barry
expand copy to clipboardexpand
//simple request handler using a thread pool with blocking sockets

const char MSG_404[] = "HTTP 404: Content not found on this server"

const int MAX_CHUNK = 10240

const int THREAD_POOL_SIZE = 32

const int TIMEOUT_INTERVAL = 10
const int MAX_INACTIVE = 2000

data Command {
	const int GET = 1
	const int POST = 2
	const int PUT = 3
	const int DELETE = 4
	const int HEAD = 5
	int type
	
	const int HTTP_1_0 = 1
	const int HTTP_1_1 = 2
	int version
	
	char hostname[]
	
	char resource[]
	char ext[]
	
	bool persistent
	
	Header headers[]
}

data WorkItem {
	TCPSocket client
	TLS tlsClient
	Stream stream
	String landingPages[]
	ConfigData cfg
	int idleTime
	bool waiting
	bool killed
	int threadID
	WorkItem next
	WorkItem prev
}

data WorkerThread {
	Thread thread
	int threadID
	
	WorkItem work
	WorkItem lastWork
	WorkItem curWork
	int workCount
}

const bool DEBUG = false

component provides RequestHandler(Destructor) requires io.Output out, io.FileSystem fileSystem, io.File, ws.DocStream, data.StringUtil stringUtil,
 data.IntUtil intUtil, ws.Web rh, time.Timer timer, net.http.Util util, time.Calendar cal, net.TLS {
	
	char web_root[] = "./"
	
	WorkerThread threadPool[] = new WorkerThread[THREAD_POOL_SIZE]
	
	Mutex workerLock = new Mutex()
	
	char END_SEQ[] = "\r\n\r\n"
	char END_R[] = "\r"
	char END_N[] = "\n"

	Mutex logLock = new Mutex()
	
	bool shutdown

	char logFile[] = "log_requests.txt"
	bool logStates = false
	bool logRequests = false
	
	void logState(char msg[])
		{
		if (logStates && logFile != null)
			{
			mutex(logLock)
				{
				File fd = new File(logFile, File.WRITE)
				fd.setPos(fd.getSize())
				fd.write(cal.getMS().makeString())
				fd.write(",")
				fd.write(msg)
				fd.write("\n")
				fd.close()
				}
			}
		}
	
	void logRequest(char clientIP[], int clientPort, char type[], char uri[])
		{
		if (logRequests && logFile != null)
			{
			mutex(logLock)
				{
				File fd = new File(logFile, File.WRITE)
				fd.setPos(fd.getSize())
				fd.write(cal.getMS().makeString())
				fd.write(",")
				fd.write(clientIP)
				fd.write(",")
				fd.write(clientPort.makeString())
				fd.write(",")
				fd.write(type)
				fd.write(",")
				fd.write(uri)
				fd.write("\n")
				fd.close()
				}
			}
		}
	
	void copyBytes(byte dest[], byte src[], int offset, int len)
		{
		for (int i = 0; i < len; i++)
			{
			dest[i] = src[offset+i]
			}
		}
	
	Command parseCommand(char buf[], int top)
		{
		//we use a custom parser here, to detect "\r\n", ":", and spaces
		// - also detect keep-alive right here, and tag it as a boolean on the Command instance
		// - we also perform lowercase operations manually as we go
		// - and we extract the file extension for mime type conversion later
		
		Command cmd = new Command()
		Header headers[]
		
		int i = 0
		
		byte mode = 0
		int start = 0
		int dot = 0
		
		//first we parse the verb / resource line
		for (i = 0; i < top; i++)
			{
			if (mode == 0)
				{
				if (buf[i] == " ")
					{
					//this is the end of the HTTP verb
					char verb[] = new char[i-start]
					copyBytes(verb, buf, start, i-start)
					
					//out.println("verb '$verb'")
					
					if (verb == "get") cmd.type = Command.GET
					else if (verb == "post") cmd.type = Command.POST
					
					mode = 1
					
					start = i + 1
					}
					else if (buf[i] >= "A" && buf[i] <= "Z")
					{
					//lowercase this
					buf[i] += 32
					}
				}
				else if (mode == 1)
				{
				if (buf[i] == " ")
					{
					//this is the end of the HTTP resource
					char resource[] = new char[i-start]
					copyBytes(resource, buf, start, i-start)
					
					//capture the file extension, if any
					if (dot != 0)
						{
						char ext[] = new char[i-(dot+1)]
						copyBytes(ext, buf, dot+1, i-(dot+1))
						
						cmd.ext = ext
						}
					
					//out.println("rsrc '$resource'")
					
					cmd.resource = resource
					
					mode = 2
					
					start = i + 1
					}
					else if (buf[i] == ".")
					{
					dot = i
					}
				}
				else if (mode == 2)
				{
				if (buf[i] == "\r")
					{
					mode = 3
					}
					else if (buf[i] >= "A" && buf[i] <= "Z")
					{
					//lowercase this
					buf[i] += 32
					}
				}
				else if (mode == 3)
				{
				if (buf[i] == "\n")
					{
					//this is the end of the HTTP version
					char version[] = new char[i-start-1]
					copyBytes(version, buf, start, i-start-1)
					
					//out.println("vers '$version'")
					
					if (version == "http/1.0") { cmd.version = Command.HTTP_1_0 }
					else if (version == "http/1.1") { cmd.version = Command.HTTP_1_1 }
					
					mode = 3
					
					start = i + 1
					
					break
					}
					else
					{
					if (DEBUG) throw new Exception("malformed HTTP request, expected newline in verb")
					return null
					}
				}
			}
		
		if (cmd.resource == null)
			{
			if (DEBUG) throw new Exception("malformed HTTP request, missing resource in verb")
			return null
			}
		
		if (cmd.version == 0)
			{
			if (DEBUG) throw new Exception("malformed HTTP request, missing version in verb")
			return null
			}
		
		//next we have headers
		char hkey[]
		char hval[]
		mode = 0
		for (; i < top; i++)
			{
			if (mode == 0)
				{
				if (buf[i] == ":")
					{
					int end = i
					
					while (buf[end] == " ") end --
					
					if (end <= start)
						{
						if (DEBUG) throw new Exception("malformed HTTP request, missing key in header line")
						return null
						}
					
					hkey = new char[end-start]
					copyBytes(hkey, buf, start, end-start)
					
					mode = 1
					
					start = i + 1
					}
					else if (buf[i] >= "A" && buf[i] <= "Z")
					{
					//lowercase this
					buf[i] += 32
					}
				}
				else if (mode == 1)
				{
				if (buf[i] == "\r")
					{
					mode = 2
					}
				}
				else if (mode == 2)
				{
				if (buf[i] == "\n")
					{
					while (buf[start] == " ") start ++
					
					if (i <= start-1)
						{
						if (DEBUG) throw new Exception("malformed HTTP request, missing value in header line")
						return null
						}
					
					hval = new char[i-start-1]
					copyBytes(hval, buf, start, i-start-1)
					
					headers = new Header[](headers, new Header(hkey, hval))
					
					//out.println("hdr '$hkey' '$hval'")
					
					//special header checks
					if (hkey == "host") cmd.hostname = hval
					if (hkey == "connection") cmd.persistent = hval.ifind("keep-alive") != StringUtil.NOT_FOUND
					
					mode = 0
					
					start = i + 1
					}
					else
					{
					if (DEBUG) throw new Exception("malformed HTTP request, expected newline in header")
					return null
					}
				}
			}
		
		cmd.headers = headers
		
		return cmd
		}
	
	Command readCommand(WorkItem w, Stream socket) {
		w.idleTime = 0
		w.waiting = true
		
		char buf[] = new char[2048]
		int offset = 0
		byte l4A
		byte l4B
		byte l4C
		byte l4D
		while (true)
			{
			char b[] = socket.recv(1)
			if (b.arrayLength == 0) break
			
			w.idleTime = 0
			
			if (offset < buf.arrayLength)
				buf[offset] = b[0]
				else
				buf = new char[](buf, b)
			
			offset ++
			
			l4A = l4B
			l4B = l4C
			l4C = l4D
			l4D = b[0]
			
			if (l4A == END_R && l4B == END_N && l4C == END_R && l4D == END_N)
				break
			}
		
		w.waiting = false
		
		if (w.killed)
			{
			return null
			}
		
		if (offset == 0)
			{
			return null
			}
		
		return parseCommand(buf, offset)
	}
	
	char[] getHeaderValue(Command cmd, char key[])
		{
		key = stringUtil.lowercase(key)
		for (int i = 0; i < cmd.headers.arrayLength; i++)
			{
			if (cmd.headers[i].key == key)
				return cmd.headers[i].value
			}
		return null
		}
	
	bool serveFunction_GET(DocStream ds, char path[], char command[]) {
		return rh.get(command, ds)
	}
	
	bool serveFunction_POST(DocStream ds, char path[], char command[], char ctype[], byte payload[]) {
		return rh.post(command, ctype, payload, ds)
	}
	
	char[] getMIMEType(char ext[], KeyVal mimeTypes[])
		{
		if (ext == null)
			return "application/octet-stream"
		
		for (int i = 0; i < mimeTypes.arrayLength; i ++)
			{
			if (mimeTypes[i].key == ext)
				{
				return mimeTypes[i].val
				}
			}
		
		return "application/octet-stream"
		}
	
	char[] getResourcePrepend(char host[], KeyVal subdomains[])
		{
		for (int i = 0; i < subdomains.arrayLength; i ++)
			{
			if (subdomains[i].key == host) return subdomains[i].val
			}
		
		return null
		}
	
	bool serveFile(char command[], char ext[], Stream s, ConfigData cfg, bool keepAlive, bool includeContent)
		{
		char path[] = new char[](web_root, command)
		
		if (fileSystem.exists(path))
			{
			if (fileSystem.getInfo(path).type == FileInfo.TYPE_DIR)
				{
				//try common landing pages
				bool found = false
				for (int i = 0; i < cfg.landingPages.arrayLength; i++)
					{
					char test[] = new char[](path, cfg.landingPages[i].string)
					
					if (fileSystem.exists(test))
						{
						if (cfg.landingPages[i].string.endsWith(".html")) ext = "html"
						
						path = test
						found = true
						break
						}
					}
				
				if (!found)
					{
					return false
					}
				}
			
			
			File fd = new File(path, File.READ)
			s.send("HTTP/1.1 200 OK\r\n")
			s.send("Server: Dana Web Engine\r\n")
			if (!keepAlive)
				s.send("Connection: close\r\n")
				else
				s.send("Connection: keep-alive\r\n")
			s.send("Content-length: $(intUtil.makeString(fd.getSize()))\r\n")
			s.send("Content-Type: $(getMIMEType(ext, cfg.mimeTypes))\r\n")
			for (int i = 0; i < cfg.xheaders.arrayLength; i++)
				{
				s.send(cfg.xheaders[i].string)
				s.send("\r\n")
				}
			s.send("Date: $(util.getDateString(cal.getTime()))\r\n")
			s.send("Last-Modified: $(util.getDateString(fileSystem.getInfo(path).modified))\r\n")
			s.send("\r\n")
			
			if (includeContent)
				{
				while (!fd.eof())
					{
					byte buf[] = fd.read(MAX_CHUNK)
					s.send(buf)
					}
				}
			
			fd.close()
			
			return true
			}
		
		return false
		}
	
	bool staticFile(char command[], String staticServe[])
		{
		for (int i = 0; i < staticServe.arrayLength; i++)
			{
			if ((command.arrayLength >= staticServe[i].string.arrayLength) && 
				(stringUtil.subString(command, 0, staticServe[i].string.arrayLength) == staticServe[i].string))
				{
				return true
				}
			}
		
		return false
		}
	
	void processRequest(WorkItem work)
		{
		String landingPages[] = work.landingPages
		String staticServe[] = work.cfg.staticServe
		
		bool keepAlive = true

		TCPSocket s = work.client
		TLS tlsSocket = work.tlsClient
		Stream stream = work.stream

		NetworkEndpoint endpoint = s.getRemoteEndpoint()
		
		if (tlsSocket != null)
			{
			if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),https,starting")

			work.idleTime = 0
			work.waiting = true
			tlsSocket.accept(s)
			work.waiting = false
			if (work.killed) keepAlive = false
			
			if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),https,started,$(work.killed)")
			}
			else
			{
			if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),http,start")
			}
		
		while (keepAlive)
			{
			//parse headers, extract command
			//call function associated with command, return result (as HTTP response)
			Command cmd = readCommand(work, stream)
			
			if (cmd == null)
				{
				//malformed / interrupted / idle-timeout request (this is normal behaviour)
				break
				}
			
			keepAlive = cmd.persistent
			
			char subDomainPath[] = getResourcePrepend(cmd.hostname, work.cfg.subdomains)
			char handler[]
			bool success = false
			
			if (cmd.type == Command.GET)
				{
				logRequest(endpoint.address, endpoint.port, "get", cmd.resource)

				if (staticFile(cmd.resource, staticServe))
					{
					if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),file,$(cmd.resource)")
					success = serveFile(cmd.resource, cmd.ext, stream, work.cfg, keepAlive, true)
					}
					else 
					{
					if (subDomainPath != null) cmd.resource = "/$subDomainPath$(cmd.resource)"
					
					if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),get,$(cmd.resource)")
					DocStream ds = new DocStream(stream, new Header[](new Header("x-client-ip", endpoint.address), cmd.headers))
					success = serveFunction_GET(ds, handler, cmd.resource)
					if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),get-ok")
					}
				}
				else if (cmd.type == Command.HEAD)
				{
				if (staticFile(cmd.resource, staticServe))
					{
					success = serveFile(cmd.resource, cmd.ext, stream, work.cfg, keepAlive, false)
					}
					else 
					{
					//TODO. -- what's the best option here? a Web.head() method? or just some auto-response?
					}
				}
				else if (cmd.type == Command.POST)
				{
				if (subDomainPath != null) cmd.resource = "/$subDomainPath$(cmd.resource)"

				if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),post,$(cmd.resource)")

				logRequest(endpoint.address, endpoint.port, "post", cmd.resource)
				
				DocStream ds = new DocStream(stream, new Header[](new Header("x-client-ip", endpoint.address), cmd.headers))
				
				//read the payload first
				char ctype[] = getHeaderValue(cmd, "content-type")
				int plen = intUtil.intFromString(getHeaderValue(cmd, "content-length"))
				byte payload[] = stream.recv(plen)
				success = serveFunction_POST(ds, handler, cmd.resource, ctype, payload)

				if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),post-ok")
				}
				else
				{
				stream.send("HTTP/1.1 501 Not Implemented\r\n")
				stream.send("\r\n")
				success = true
				}
			
			if (!success)
				{
				if (fileSystem.exists("404.html"))
					{
					serveFile("404.html", "html", stream, work.cfg, keepAlive, cmd.type != Command.HEAD)
					}
					else
					{
					stream.send("HTTP/1.1 404 Resource Not Found\r\n")
					stream.send("Server: Dana Web Engine\r\n")
					if (keepAlive)
						stream.send("Connection: keep-alive\r\n")
						else
						stream.send("Connection: close\r\n")
					stream.send("Content-length: $(MSG_404.arrayLength)\r\n")
					stream.send("Content-type: text/plain \r\n")
					stream.send("Date: $(util.getDateString(cal.getTime()))\r\n")
					stream.send("\r\n")
					stream.send(MSG_404)
					}
				}
			}
		

		if (tlsSocket != null)
			{
			tlsSocket.close()
			if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),https,end")
			}
			else
			{
			if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),http,end")
			}
		
		s.disconnect()
		}
	
	RequestHandler:RequestHandler()
		{
		//init tasks for web app
		rh.setup()

		//start thread pool
		
		for (int i = 0; i < THREAD_POOL_SIZE; i++)
			{
			threadPool[i] = new WorkerThread()
			threadPool[i].thread = asynch::workerThread(threadPool[i])
			threadPool[i].threadID = i
			}
		
		asynch::timerThread()
		}
	
	void workerThread(store WorkerThread state)
		{
		while (true)
			{
			this.thread.wait()

			if (logStates) logState("worker-thread-signalled,$(state.threadID),$(state.workCount)")
			
			if (state.workCount != 0)
				{
				WorkItem w = state.work
				
				mutex(state)
					{
					w = state.work
					state.work = w.next
					if (state.work == null) state.lastWork = null
					state.curWork = w
					}
				
				processRequest(w)
				
				mutex(state)
					{
					state.workCount --
					state.curWork = null
					w = null

					if (logStates) logState("worker-job-end,$(state.threadID),$(state.workCount)")
					}
				}
				else if (shutdown)
				{
				return
				}
			}
		}
	
	void timerThread()
		{
		while (!shutdown)
			{
			timer.sleep(TIMEOUT_INTERVAL)

			for (int i = 0; i < threadPool.arrayLength; i++)
				{
				mutex(threadPool[i])
					{
					WorkItem curWork = threadPool[i].curWork
					if (curWork != null)
						{
						if (curWork.waiting)
							{
							curWork.idleTime += TIMEOUT_INTERVAL
							//out.println("idle time on $(curWork.threadID) now $(curWork.idleTime)")
							}

						if (curWork.idleTime >= MAX_INACTIVE)
							{
							if (curWork.tlsClient != null) curWork.tlsClient.close()
							curWork.client.disconnect()
							curWork.idleTime = 0
							curWork.killed = true
							//out.println("killing connection on thread $(curWork.threadID)")
							}
						}
					}
				}
			}
		}
	
	WorkerThread getWorker()
		{
		int smallestIndex = 0
		
		mutex(workerLock)
			{
			for (int i = 1; i < threadPool.arrayLength; i++)
				{
				if (threadPool[i].workCount < threadPool[smallestIndex].workCount)
					smallestIndex = i
				}
			}
		
		return threadPool[smallestIndex]
		}
	
	void RequestHandler:processStream(store TCPSocket s, store TLSContext tlsCtx, store ConfigData cfg)
		{
		TLS tls

		NetworkEndpoint endpoint = s.getRemoteEndpoint()

		WorkerThread worker = getWorker()
		
		Stream stream = s

		if (tlsCtx != null)
			{
			tls = new TLS(tlsCtx)
			stream = tls
			}
		
		WorkItem nwi = new WorkItem(s, tls, stream, cfg.landingPages, cfg)
		
		mutex(worker)
			{
			nwi.threadID = worker.threadID

			if (worker.lastWork == null)
				worker.work = nwi
				else
				worker.lastWork.next = nwi
			
			worker.lastWork = nwi
			
			worker.workCount ++

			if (logStates) logState("process_stream,$(endpoint.address),$(endpoint.port),worker-assigned,$(worker.threadID),$(worker.workCount)")
			}
		
		worker.thread.signal()
		}
	
	void Destructor:destroy()
		{
		shutdown = true

		for (int i = 0; i < threadPool.arrayLength; i++)
			{
			threadPool[i].thread.signal()
			threadPool[i].thread.join()
			}
		}
	
	}
Revision history
To propose a new revision to this entity, use dana source put -uc your/new/version.dn -n ws.RequestHandler -m "reason for update" -u yourUsername
Version 7 by barry
Version 6 (this version) by barry
Notes for this version: Turns off basic header exceptions unless a debug flag is used, to prevent them filling up log files
Version 5 by barry
Version 4 by barry
Version 3 by barry
Version 2 by barry
Version 1 by barry