//simple request handler using a thread pool with blocking sockets
const char MSG_404[] = "HTTP 404: Content not found on this server"
const int MAX_CHUNK = 10240
const int THREAD_POOL_SIZE = 32
const int TIMEOUT_INTERVAL = 10
const int MAX_INACTIVE = 2000
data Command {
const int GET = 1
const int POST = 2
const int PUT = 3
const int DELETE = 4
const int HEAD = 5
int type
const int HTTP_1_0 = 1
const int HTTP_1_1 = 2
int version
char hostname[]
char resource[]
char ext[]
bool persistent
Header headers[]
}
data WorkItem {
TCPSocket client
TLS tlsClient
Stream stream
String landingPages[]
ConfigData cfg
int idleTime
bool waiting
bool killed
int threadID
WorkItem next
WorkItem prev
}
data WorkerThread {
Thread thread
int threadID
WorkItem work
WorkItem lastWork
WorkItem curWork
int workCount
}
const bool DEBUG = false
component provides RequestHandler(Destructor) requires io.Output out, io.FileSystem fileSystem, io.File, ws.DocStream, data.StringUtil stringUtil,
data.IntUtil intUtil, ws.Web rh, time.Timer timer, net.http.Util util, time.Calendar cal, net.TLS {
char web_root[] = "./"
WorkerThread threadPool[] = new WorkerThread[THREAD_POOL_SIZE]
Mutex workerLock = new Mutex()
char END_SEQ[] = "\r\n\r\n"
char END_R[] = "\r"
char END_N[] = "\n"
Mutex logLock = new Mutex()
bool shutdown
char logFile[] = "log_requests.txt"
bool logStates = false
bool logRequests = false
void logState(char msg[])
{
if (logStates && logFile != null)
{
mutex(logLock)
{
File fd = new File(logFile, File.WRITE)
fd.setPos(fd.getSize())
fd.write(cal.getMS().makeString())
fd.write(",")
fd.write(msg)
fd.write("\n")
fd.close()
}
}
}
void logRequest(char clientIP[], int clientPort, char type[], char uri[])
{
if (logRequests && logFile != null)
{
mutex(logLock)
{
File fd = new File(logFile, File.WRITE)
fd.setPos(fd.getSize())
fd.write(cal.getMS().makeString())
fd.write(",")
fd.write(clientIP)
fd.write(",")
fd.write(clientPort.makeString())
fd.write(",")
fd.write(type)
fd.write(",")
fd.write(uri)
fd.write("\n")
fd.close()
}
}
}
void copyBytes(byte dest[], byte src[], int offset, int len)
{
for (int i = 0; i < len; i++)
{
dest[i] = src[offset+i]
}
}
Command parseCommand(char buf[], int top)
{
//we use a custom parser here, to detect "\r\n", ":", and spaces
// - also detect keep-alive right here, and tag it as a boolean on the Command instance
// - we also perform lowercase operations manually as we go
// - and we extract the file extension for mime type conversion later
Command cmd = new Command()
Header headers[]
int i = 0
byte mode = 0
int start = 0
int dot = 0
//first we parse the verb / resource line
for (i = 0; i < top; i++)
{
if (mode == 0)
{
if (buf[i] == " ")
{
//this is the end of the HTTP verb
char verb[] = new char[i-start]
copyBytes(verb, buf, start, i-start)
//out.println("verb '$verb'")
if (verb == "get") cmd.type = Command.GET
else if (verb == "post") cmd.type = Command.POST
mode = 1
start = i + 1
}
else if (buf[i] >= "A" && buf[i] <= "Z")
{
//lowercase this
buf[i] += 32
}
}
else if (mode == 1)
{
if (buf[i] == " ")
{
//this is the end of the HTTP resource
char resource[] = new char[i-start]
copyBytes(resource, buf, start, i-start)
//capture the file extension, if any
if (dot != 0)
{
char ext[] = new char[i-(dot+1)]
copyBytes(ext, buf, dot+1, i-(dot+1))
cmd.ext = ext
}
//out.println("rsrc '$resource'")
cmd.resource = resource
mode = 2
start = i + 1
}
else if (buf[i] == ".")
{
dot = i
}
}
else if (mode == 2)
{
if (buf[i] == "\r")
{
mode = 3
}
else if (buf[i] >= "A" && buf[i] <= "Z")
{
//lowercase this
buf[i] += 32
}
}
else if (mode == 3)
{
if (buf[i] == "\n")
{
//this is the end of the HTTP version
char version[] = new char[i-start-1]
copyBytes(version, buf, start, i-start-1)
//out.println("vers '$version'")
if (version == "http/1.0") { cmd.version = Command.HTTP_1_0 }
else if (version == "http/1.1") { cmd.version = Command.HTTP_1_1 }
mode = 3
start = i + 1
break
}
else
{
if (DEBUG) throw new Exception("malformed HTTP request, expected newline in verb")
return null
}
}
}
if (cmd.resource == null)
{
if (DEBUG) throw new Exception("malformed HTTP request, missing resource in verb")
return null
}
if (cmd.version == 0)
{
if (DEBUG) throw new Exception("malformed HTTP request, missing version in verb")
return null
}
//next we have headers
char hkey[]
char hval[]
mode = 0
for (; i < top; i++)
{
if (mode == 0)
{
if (buf[i] == ":")
{
int end = i
while (buf[end] == " ") end --
if (end <= start)
{
if (DEBUG) throw new Exception("malformed HTTP request, missing key in header line")
return null
}
hkey = new char[end-start]
copyBytes(hkey, buf, start, end-start)
mode = 1
start = i + 1
}
else if (buf[i] >= "A" && buf[i] <= "Z")
{
//lowercase this
buf[i] += 32
}
}
else if (mode == 1)
{
if (buf[i] == "\r")
{
mode = 2
}
}
else if (mode == 2)
{
if (buf[i] == "\n")
{
while (buf[start] == " ") start ++
if (i <= start-1)
{
if (DEBUG) throw new Exception("malformed HTTP request, missing value in header line")
return null
}
hval = new char[i-start-1]
copyBytes(hval, buf, start, i-start-1)
headers = new Header[](headers, new Header(hkey, hval))
//out.println("hdr '$hkey' '$hval'")
//special header checks
if (hkey == "host") cmd.hostname = hval
if (hkey == "connection") cmd.persistent = hval.ifind("keep-alive") != StringUtil.NOT_FOUND
mode = 0
start = i + 1
}
else
{
if (DEBUG) throw new Exception("malformed HTTP request, expected newline in header")
return null
}
}
}
cmd.headers = headers
return cmd
}
Command readCommand(WorkItem w, Stream socket) {
w.idleTime = 0
w.waiting = true
char buf[] = new char[2048]
int offset = 0
byte l4A
byte l4B
byte l4C
byte l4D
while (true)
{
char b[] = socket.recv(1)
if (b.arrayLength == 0) break
w.idleTime = 0
if (offset < buf.arrayLength)
buf[offset] = b[0]
else
buf = new char[](buf, b)
offset ++
l4A = l4B
l4B = l4C
l4C = l4D
l4D = b[0]
if (l4A == END_R && l4B == END_N && l4C == END_R && l4D == END_N)
break
}
w.waiting = false
if (w.killed)
{
return null
}
if (offset == 0)
{
return null
}
return parseCommand(buf, offset)
}
char[] getHeaderValue(Command cmd, char key[])
{
key = stringUtil.lowercase(key)
for (int i = 0; i < cmd.headers.arrayLength; i++)
{
if (cmd.headers[i].key == key)
return cmd.headers[i].value
}
return null
}
bool serveFunction_GET(DocStream ds, char path[], char command[]) {
return rh.get(command, ds)
}
bool serveFunction_POST(DocStream ds, char path[], char command[], char ctype[], byte payload[]) {
return rh.post(command, ctype, payload, ds)
}
char[] getMIMEType(char ext[], KeyVal mimeTypes[])
{
if (ext == null)
return "application/octet-stream"
for (int i = 0; i < mimeTypes.arrayLength; i ++)
{
if (mimeTypes[i].key == ext)
{
return mimeTypes[i].val
}
}
return "application/octet-stream"
}
char[] getResourcePrepend(char host[], KeyVal subdomains[])
{
for (int i = 0; i < subdomains.arrayLength; i ++)
{
if (subdomains[i].key == host) return subdomains[i].val
}
return null
}
bool serveFile(char command[], char ext[], Stream s, ConfigData cfg, bool keepAlive, bool includeContent)
{
char path[] = new char[](web_root, command)
if (fileSystem.exists(path))
{
if (fileSystem.getInfo(path).type == FileInfo.TYPE_DIR)
{
//try common landing pages
bool found = false
for (int i = 0; i < cfg.landingPages.arrayLength; i++)
{
char test[] = new char[](path, cfg.landingPages[i].string)
if (fileSystem.exists(test))
{
if (cfg.landingPages[i].string.endsWith(".html")) ext = "html"
path = test
found = true
break
}
}
if (!found)
{
return false
}
}
File fd = new File(path, File.READ)
s.send("HTTP/1.1 200 OK\r\n")
s.send("Server: Dana Web Engine\r\n")
if (!keepAlive)
s.send("Connection: close\r\n")
else
s.send("Connection: keep-alive\r\n")
s.send("Content-length: $(intUtil.makeString(fd.getSize()))\r\n")
s.send("Content-Type: $(getMIMEType(ext, cfg.mimeTypes))\r\n")
for (int i = 0; i < cfg.xheaders.arrayLength; i++)
{
s.send(cfg.xheaders[i].string)
s.send("\r\n")
}
s.send("Date: $(util.getDateString(cal.getTime()))\r\n")
s.send("Last-Modified: $(util.getDateString(fileSystem.getInfo(path).modified))\r\n")
s.send("\r\n")
if (includeContent)
{
while (!fd.eof())
{
byte buf[] = fd.read(MAX_CHUNK)
s.send(buf)
}
}
fd.close()
return true
}
return false
}
bool staticFile(char command[], String staticServe[])
{
for (int i = 0; i < staticServe.arrayLength; i++)
{
if ((command.arrayLength >= staticServe[i].string.arrayLength) &&
(stringUtil.subString(command, 0, staticServe[i].string.arrayLength) == staticServe[i].string))
{
return true
}
}
return false
}
void processRequest(WorkItem work)
{
String landingPages[] = work.landingPages
String staticServe[] = work.cfg.staticServe
bool keepAlive = true
TCPSocket s = work.client
TLS tlsSocket = work.tlsClient
Stream stream = work.stream
NetworkEndpoint endpoint = s.getRemoteEndpoint()
if (tlsSocket != null)
{
if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),https,starting")
work.idleTime = 0
work.waiting = true
tlsSocket.accept(s)
work.waiting = false
if (work.killed) keepAlive = false
if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),https,started,$(work.killed)")
}
else
{
if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),http,start")
}
while (keepAlive)
{
//parse headers, extract command
//call function associated with command, return result (as HTTP response)
Command cmd = readCommand(work, stream)
if (cmd == null)
{
//malformed / interrupted / idle-timeout request (this is normal behaviour)
break
}
keepAlive = cmd.persistent
char subDomainPath[] = getResourcePrepend(cmd.hostname, work.cfg.subdomains)
char handler[]
bool success = false
if (cmd.type == Command.GET)
{
logRequest(endpoint.address, endpoint.port, "get", cmd.resource)
if (staticFile(cmd.resource, staticServe))
{
if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),file,$(cmd.resource)")
success = serveFile(cmd.resource, cmd.ext, stream, work.cfg, keepAlive, true)
}
else
{
if (subDomainPath != null) cmd.resource = "/$subDomainPath$(cmd.resource)"
if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),get,$(cmd.resource)")
DocStream ds = new DocStream(stream, new Header[](new Header("x-client-ip", endpoint.address), cmd.headers))
success = serveFunction_GET(ds, handler, cmd.resource)
if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),get-ok")
}
}
else if (cmd.type == Command.HEAD)
{
if (staticFile(cmd.resource, staticServe))
{
success = serveFile(cmd.resource, cmd.ext, stream, work.cfg, keepAlive, false)
}
else
{
//TODO. -- what's the best option here? a Web.head() method? or just some auto-response?
}
}
else if (cmd.type == Command.POST)
{
if (subDomainPath != null) cmd.resource = "/$subDomainPath$(cmd.resource)"
if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),post,$(cmd.resource)")
logRequest(endpoint.address, endpoint.port, "post", cmd.resource)
DocStream ds = new DocStream(stream, new Header[](new Header("x-client-ip", endpoint.address), cmd.headers))
//read the payload first
char ctype[] = getHeaderValue(cmd, "content-type")
int plen = intUtil.intFromString(getHeaderValue(cmd, "content-length"))
byte payload[] = stream.recv(plen)
success = serveFunction_POST(ds, handler, cmd.resource, ctype, payload)
if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),post-ok")
}
else
{
stream.send("HTTP/1.1 501 Not Implemented\r\n")
stream.send("\r\n")
success = true
}
if (!success)
{
if (fileSystem.exists("404.html"))
{
serveFile("404.html", "html", stream, work.cfg, keepAlive, cmd.type != Command.HEAD)
}
else
{
stream.send("HTTP/1.1 404 Resource Not Found\r\n")
stream.send("Server: Dana Web Engine\r\n")
if (keepAlive)
stream.send("Connection: keep-alive\r\n")
else
stream.send("Connection: close\r\n")
stream.send("Content-length: $(MSG_404.arrayLength)\r\n")
stream.send("Content-type: text/plain \r\n")
stream.send("Date: $(util.getDateString(cal.getTime()))\r\n")
stream.send("\r\n")
stream.send(MSG_404)
}
}
}
if (tlsSocket != null)
{
tlsSocket.close()
if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),https,end")
}
else
{
if (logStates) logState("process_request,$(endpoint.address),$(endpoint.port),http,end")
}
s.disconnect()
}
RequestHandler:RequestHandler()
{
//init tasks for web app
rh.setup()
//start thread pool
for (int i = 0; i < THREAD_POOL_SIZE; i++)
{
threadPool[i] = new WorkerThread()
threadPool[i].thread = asynch::workerThread(threadPool[i])
threadPool[i].threadID = i
}
asynch::timerThread()
}
void workerThread(store WorkerThread state)
{
while (true)
{
this.thread.wait()
if (logStates) logState("worker-thread-signalled,$(state.threadID),$(state.workCount)")
if (state.workCount != 0)
{
WorkItem w = state.work
mutex(state)
{
w = state.work
state.work = w.next
if (state.work == null) state.lastWork = null
state.curWork = w
}
processRequest(w)
mutex(state)
{
state.workCount --
state.curWork = null
w = null
if (logStates) logState("worker-job-end,$(state.threadID),$(state.workCount)")
}
}
else if (shutdown)
{
return
}
}
}
void timerThread()
{
while (!shutdown)
{
timer.sleep(TIMEOUT_INTERVAL)
for (int i = 0; i < threadPool.arrayLength; i++)
{
mutex(threadPool[i])
{
WorkItem curWork = threadPool[i].curWork
if (curWork != null)
{
if (curWork.waiting)
{
curWork.idleTime += TIMEOUT_INTERVAL
//out.println("idle time on $(curWork.threadID) now $(curWork.idleTime)")
}
if (curWork.idleTime >= MAX_INACTIVE)
{
if (curWork.tlsClient != null) curWork.tlsClient.close()
curWork.client.disconnect()
curWork.idleTime = 0
curWork.killed = true
//out.println("killing connection on thread $(curWork.threadID)")
}
}
}
}
}
}
WorkerThread getWorker()
{
int smallestIndex = 0
mutex(workerLock)
{
for (int i = 1; i < threadPool.arrayLength; i++)
{
if (threadPool[i].workCount < threadPool[smallestIndex].workCount)
smallestIndex = i
}
}
return threadPool[smallestIndex]
}
void RequestHandler:processStream(store TCPSocket s, store TLSContext tlsCtx, store ConfigData cfg)
{
TLS tls
NetworkEndpoint endpoint = s.getRemoteEndpoint()
WorkerThread worker = getWorker()
Stream stream = s
if (tlsCtx != null)
{
tls = new TLS(tlsCtx)
stream = tls
}
WorkItem nwi = new WorkItem(s, tls, stream, cfg.landingPages, cfg)
mutex(worker)
{
nwi.threadID = worker.threadID
if (worker.lastWork == null)
worker.work = nwi
else
worker.lastWork.next = nwi
worker.lastWork = nwi
worker.workCount ++
if (logStates) logState("process_stream,$(endpoint.address),$(endpoint.port),worker-assigned,$(worker.threadID),$(worker.workCount)")
}
worker.thread.signal()
}
void Destructor:destroy()
{
shutdown = true
for (int i = 0; i < threadPool.arrayLength; i++)
{
threadPool[i].thread.signal()
threadPool[i].thread.join()
}
}
}