//simple request handler using a thread pool with blocking sockets
const char MSG_404[] = "HTTP 404: Content not found on this server"
const int MAX_CHUNK = 10240
const int BUF_SIZE_INCREMENT = 2048
const int RECV_SIZE = 2048
const int MAX_HEADER = 10240 //10KB
const int MAX_PAYLOAD = 1048576 //1MB
const int REQ_THREAD_POOL_SIZE = 32
const int NET_THREAD_POOL_SIZE = 1
const int MAX_INACTIVE = 2000
const int MONITOR_WAIT_TIME = 100
const int RECV_CONNECTION_CONTINUE = 1
const int RECV_CONNECTION_CLOSE = 2
data Command {
const int GET = 1
const int POST = 2
const int PUT = 3
const int DELETE = 4
const int HEAD = 5
int type
const int HTTP_1_0 = 1
const int HTTP_1_1 = 2
int version
char hostname[]
char resource[]
char ext[]
bool persistent
Header headers[]
}
data WorkItem {
TCPSocket client
TLS tlsClient
Stream stream
String landingPages[]
ConfigData cfg
bool killed
int threadID
NetWorkItem fromNetItem
Command nextCommand
byte payload[]
WorkItem next
}
data NetWorkItem nocycle {
TCPSocket client
TLS tlsClient
Stream stream
ConfigData cfg
int idleTime
bool waiting
bool killed
int workID
const byte S_WAIT_REQUEST = 0
const byte S_WAIT_PAYLOAD = 1
const byte S_TLS_ACCEPTING = 2
const byte S_TLS_CLOSING = 3
const byte S_CLOSED = 4
byte state
Command nextCommand
byte buf[]
int bufNextByte
int payloadPending
int lastActiveTime //timed eviction
bool evict //immediate eviction
int reqWorkCount
int unsentBytes
ReqWorkerThread curReqThread
NetWorkItem next
NetWorkItem prev
}
data ReqWorkerThread {
Thread thread
int threadID
WorkItem work
WorkItem lastWork
int workCount
}
data NetThreadInfo {
int index
}
data NetWorkerThread {
Thread thread
int threadID
TCPMonitor monitor
NetWorkItem work
NetWorkItem workEnd
int workCount
}
component provides RequestHandler(Destructor) requires io.Output out, io.FileSystem fileSystem, io.File, ws.DocStream, data.StringUtil stringUtil,
data.IntUtil intUtil, ws.Web rh, time.Timer timer, net.http.Util util, time.Calendar cal, net.TCPMonitor, net.TLS {
char web_root[] = "./"
ReqWorkerThread reqThreadPool[] = new ReqWorkerThread[REQ_THREAD_POOL_SIZE]
NetWorkerThread netThreadPool[] = new NetWorkerThread[NET_THREAD_POOL_SIZE]
Mutex workerLock = new Mutex()
char END_SEQ[] = "\r\n\r\n"
char END_R[] = "\r"
char END_N[] = "\n"
bool debugConnections
bool shutdown
int nextWorkID = 0
void logData(char fileName[], char value[])
{
File fd = new File(fileName, File.WRITE)
fd.setPos(fd.getSize())
fd.write(value)
fd.write("\n")
fd.close()
}
void copyBytes(byte dest[], byte src[], int destOffset, int srcOffset, int len)
{
int j = destOffset
for (int i = 0; i < len; i++)
{
dest[j] = src[srcOffset+i]
j ++
}
}
Command parseCommand(char buf[], int top)
{
//we use a custom parser here, to detect "\r\n", ":", and spaces
// - also detect keep-alive right here, and tag it as a boolean on the Command instance
// - we also perform lowercase operations manually as we go
// - and we extract the file extension for mime type conversion later
Command cmd = new Command()
Header headers[]
int i = 0
byte mode = 0
int start = 0
int dot = 0
//first we parse the verb / resource line
for (i = 0; i < top; i++)
{
if (mode == 0)
{
if (buf[i] == " ")
{
//this is the end of the HTTP verb
char verb[] = new char[i-start]
copyBytes(verb, buf, 0, start, i-start)
//out.println("verb '$verb'")
if (verb == "get") cmd.type = Command.GET
else if (verb == "post") cmd.type = Command.POST
mode = 1
start = i + 1
}
else if (buf[i] >= "A" && buf[i] <= "Z")
{
//lowercase this
buf[i] += 32
}
}
else if (mode == 1)
{
if (buf[i] == " ")
{
//this is the end of the HTTP resource
char resource[] = new char[i-start]
copyBytes(resource, buf, 0, start, i-start)
//capture the file extension, if any
if (dot != 0)
{
char ext[] = new char[i-(dot+1)]
copyBytes(ext, buf, 0, dot+1, i-(dot+1))
cmd.ext = ext
}
//out.println("rsrc '$resource'")
cmd.resource = resource
mode = 2
start = i + 1
}
else if (buf[i] == ".")
{
dot = i
}
}
else if (mode == 2)
{
if (buf[i] == "\r")
{
mode = 3
}
else if (buf[i] >= "A" && buf[i] <= "Z")
{
//lowercase this
buf[i] += 32
}
}
else if (mode == 3)
{
if (buf[i] == "\n")
{
//this is the end of the HTTP version
char version[] = new char[i-start-1]
copyBytes(version, buf, 0, start, i-start-1)
//out.println("vers '$version'")
if (version == "http/1.0") { cmd.version = Command.HTTP_1_0 }
else if (version == "http/1.1") { cmd.version = Command.HTTP_1_1 }
mode = 3
start = i + 1
break
}
else
{
throw new Exception("malformed HTTP request, expected newline in verb")
}
}
}
if (cmd.resource == null)
throw new Exception("malformed HTTP request, missing resource in verb")
if (cmd.version == 0)
throw new Exception("malformed HTTP request, missing version in verb")
//next we have headers
char hkey[]
char hval[]
mode = 0
for (; i < top; i++)
{
if (mode == 0)
{
if (buf[i] == ":")
{
int end = i
while (buf[end] == " ") end --
if (end <= start) throw new Exception("malformed HTTP request, missing key in header line")
hkey = new char[end-start]
copyBytes(hkey, buf, 0, start, end-start)
mode = 1
start = i + 1
}
else if (buf[i] >= "A" && buf[i] <= "Z")
{
//lowercase this
buf[i] += 32
}
}
else if (mode == 1)
{
if (buf[i] == "\r")
{
mode = 2
}
}
else if (mode == 2)
{
if (buf[i] == "\n")
{
while (buf[start] == " ") start ++
if (i <= start-1) throw new Exception("malformed HTTP request, missing value in header line")
hval = new char[i-start-1]
copyBytes(hval, buf, 0, start, i-start-1)
headers = new Header[](headers, new Header(hkey, hval))
//out.println("hdr '$hkey' '$hval'")
//special header checks
if (hkey == "host") cmd.hostname = hval
if (hkey == "connection") cmd.persistent = hval.ifind("keep-alive") != StringUtil.NOT_FOUND
mode = 0
start = i + 1
}
else
{
throw new Exception("malformed HTTP request, expected newline in header")
}
}
}
cmd.headers = headers
return cmd
}
char[] getHeaderValue(Command cmd, char key[])
{
key = stringUtil.lowercase(key)
for (int i = 0; i < cmd.headers.arrayLength; i++)
{
if (cmd.headers[i].key == key)
return cmd.headers[i].value
}
return null
}
bool serveFunction_GET(DocStream ds, char path[], char command[]) {
return rh.get(command, ds)
}
bool serveFunction_POST(DocStream ds, char path[], char command[], char ctype[], byte payload[]) {
return rh.post(command, ctype, payload, ds)
}
char[] getMIMEType(char ext[], KeyVal mimeTypes[])
{
if (ext == null)
return "application/octet-stream"
for (int i = 0; i < mimeTypes.arrayLength; i ++)
{
if (mimeTypes[i].key == ext)
{
return mimeTypes[i].val
}
}
return "application/octet-stream"
}
char[] getResourcePrepend(char host[], KeyVal subdomains[])
{
for (int i = 0; i < subdomains.arrayLength; i ++)
{
if (subdomains[i].key == host) return subdomains[i].val
}
return null
}
bool serveFile(char command[], char ext[], Stream s, ConfigData cfg, bool keepAlive, bool includeContent, opt char hdrLine[])
{
char path[] = new char[](web_root, command)
if (hdrLine == null) hdrLine = "HTTP/1.1 200 OK"
if (fileSystem.exists(path))
{
if (fileSystem.getInfo(path).type == FileInfo.TYPE_DIR)
{
//try common landing pages
bool found = false
for (int i = 0; i < cfg.landingPages.arrayLength; i++)
{
char test[] = new char[](path, cfg.landingPages[i].string)
if (fileSystem.exists(test))
{
if (cfg.landingPages[i].string.endsWith(".html")) ext = "html"
path = test
found = true
break
}
}
if (!found)
{
return false
}
}
File fd = new File(path, File.READ)
s.send("$hdrLine\r\n")
s.send("Server: Dana Web Engine\r\n")
if (!keepAlive)
s.send("Connection: close\r\n")
else
s.send("Connection: keep-alive\r\n")
s.send("Content-length: $(intUtil.makeString(fd.getSize()))\r\n")
s.send("Content-Type: $(getMIMEType(ext, cfg.mimeTypes))\r\n")
s.send("Date: $(util.getDateString(cal.getTime()))\r\n")
s.send("Last-Modified: $(util.getDateString(fileSystem.getInfo(path).modified))\r\n")
s.send("\r\n")
if (includeContent)
{
while (!fd.eof())
{
byte buf[] = fd.read(MAX_CHUNK)
s.send(buf)
}
}
fd.close()
return true
}
return false
}
bool staticFile(char command[], String staticServe[])
{
for (int i = 0; i < staticServe.arrayLength; i++)
{
if ((command.arrayLength >= staticServe[i].string.arrayLength) &&
(stringUtil.subString(command, 0, staticServe[i].string.arrayLength) == staticServe[i].string))
{
return true
}
}
return false
}
void processRequest(WorkItem work)
{
Stream stream = work.stream
String landingPages[] = work.landingPages
String staticServe[] = work.cfg.staticServe
bool keepAlive = true
//parse headers, extract command
//call function associated with command, return result (as HTTP response)
Command cmd = work.nextCommand
if (debugConnections) out.println("[NWID $(work.fromNetItem.workID)] process req $(cmd.resource)")
if (cmd == null)
{
//malformed / interrupted / idle-timeout request (this is normal behaviour)
return
}
keepAlive = cmd.persistent
char subDomainPath[] = getResourcePrepend(cmd.hostname, work.cfg.subdomains)
char handler[]
bool success = false
if (cmd.type == Command.GET)
{
if (staticFile(cmd.resource, staticServe))
{
success = serveFile(cmd.resource, cmd.ext, stream, work.cfg, keepAlive, true)
}
else
{
if (subDomainPath != null) cmd.resource = "/$subDomainPath$(cmd.resource)"
DocStream ds = new DocStream(stream, cmd.headers)
success = serveFunction_GET(ds, handler, cmd.resource)
}
}
else if (cmd.type == Command.HEAD)
{
if (staticFile(cmd.resource, staticServe))
{
success = serveFile(cmd.resource, cmd.ext, stream, work.cfg, keepAlive, false)
}
else
{
//TODO. -- what's the best option here? a Web.head() method? or just some auto-response?
}
}
else if (cmd.type == Command.POST)
{
if (subDomainPath != null) cmd.resource = "/$subDomainPath$(cmd.resource)"
DocStream ds = new DocStream(stream, cmd.headers)
//read the payload first
char ctype[] = getHeaderValue(cmd, "content-type")
byte payload[] = work.payload
success = serveFunction_POST(ds, handler, cmd.resource, ctype, payload)
}
else
{
stream.send("HTTP/1.1 501 Not Implemented\r\n")
stream.send("\r\n")
success = true
}
if (!success)
{
if (fileSystem.exists("404.html"))
{
serveFile("404.html", "html", stream, work.cfg, keepAlive, cmd.type != Command.HEAD, "HTTP/1.1 404 Resource Not Found")
}
else
{
stream.send("HTTP/1.1 404 Resource Not Found\r\n")
stream.send("Server: Dana Web Engine\r\n")
if (keepAlive)
stream.send("Connection: keep-alive\r\n")
else
stream.send("Connection: close\r\n")
stream.send("Content-length: $(MSG_404.arrayLength)\r\n")
stream.send("Content-type: text/plain \r\n")
stream.send("Date: $(util.getDateString(cal.getTime()))\r\n")
stream.send("\r\n")
stream.send(MSG_404)
}
}
if (debugConnections) out.println("[NWID $(work.fromNetItem.workID)] process req $(cmd.resource) OK")
mutex(work.fromNetItem)
{
work.fromNetItem.lastActiveTime = cal.getMS()
work.fromNetItem.reqWorkCount --
}
}
RequestHandler:RequestHandler()
{
//init tasks for web app
rh.setup()
//start thread pool
for (int i = 0; i < NET_THREAD_POOL_SIZE; i++)
{
netThreadPool[i] = new NetWorkerThread()
netThreadPool[i].monitor = new TCPMonitor()
netThreadPool[i].thread = asynch::netWorkerThread(netThreadPool[i])
netThreadPool[i].threadID = i
}
for (int i = 0; i < REQ_THREAD_POOL_SIZE; i++)
{
reqThreadPool[i] = new ReqWorkerThread()
reqThreadPool[i].thread = asynch::reqWorkerThread(reqThreadPool[i])
reqThreadPool[i].threadID = i
}
//asynch::timerThread()
}
void reqWorkerThread(ReqWorkerThread state)
{
while (true)
{
this.thread.wait()
if (state.workCount != 0)
{
WorkItem w = state.work
mutex(state)
{
w = state.work
state.work = w.next
if (state.work == null) state.lastWork = null
w.next = null
}
processRequest(w)
if (w.nextCommand != null && !w.nextCommand.persistent)
{
w.fromNetItem.evict = true
//TODO: we MUST pass state into this first parameter, somehow!!!
closeConnection(null, w.fromNetItem, false)
}
mutex(state)
{
state.workCount --
w.fromNetItem = null
w = null
}
}
else if (shutdown)
{
return
}
}
}
eventsink NetEvents(EventData ed)
{
//only one event is possible here (sendWait)
TCPSocket client = ed.source
NetThreadInfo threadInfo = ed.details
mutex(threadInfo)
{
if (client.connected())
netThreadPool[threadInfo.index].monitor.armSendNotify(client)
}
}
eventsink NetEventsTLS(EventData ed)
{
//only one event is possible here (sendWait)
TLS tls = ed.source
TCPSocket client = tls.getSocket()
NetThreadInfo threadInfo = ed.details
netThreadPool[threadInfo.index].monitor.armSendNotify(client)
}
byte[] expandBuffer(byte buf[], int newMinSize, int incrementSize)
{
//out.println("expand buffer from $(buf.arrayLength) to at least $(newMinSize) via $(incrementSize)")
int newActualSize = 0
if (incrementSize != 0)
{
newActualSize = newMinSize
if ((newMinSize % incrementSize) != 0) newActualSize += (incrementSize - (newMinSize % incrementSize))
}
else
{
newActualSize = newMinSize
}
byte newBuf[] = new byte[newActualSize]
newBuf =[] buf
//out.println(" -- buffer now $newActualSize bytes")
return newBuf
}
int receiveBytes(NetWorkItem item)
{
//receive a chunk of data; append it to item's buf;
//check if that buf now contains a complete request header (and if it has any bytes after that request, which might form the request body);
//if it does, fire the processRequest() function to handle that request (BUT that needs to be in a different thread, since it'll need sendBuffer!);
//if the receive call returns "null", check for close status and consider closing the socket & removing it
bool requestReady = false
Command cmd = null
byte buf[] = item.stream.recv(RECV_SIZE)
if (buf != null)
{
//expand our item's buffer if needed, otherwise append the data to it
if ((item.bufNextByte + buf.arrayLength) > item.buf.arrayLength)
{
//if we're in NetWorkItem.S_WAIT_REQUEST mode, check we're not going to go over some header size limit
if ((item.state == NetWorkItem.S_WAIT_REQUEST) && (item.bufNextByte + buf.arrayLength > MAX_HEADER))
return RECV_CONNECTION_CLOSE
item.buf = expandBuffer(item.buf, item.bufNextByte + buf.arrayLength, BUF_SIZE_INCREMENT)
}
copyBytes(item.buf, buf, item.bufNextByte, 0, buf.arrayLength)
if (item.state == NetWorkItem.S_WAIT_REQUEST)
{
//scan from the current bufNextByte to understand if the remainder of the request is now available in the buffer
// - if it is, finish processing the request into a Command (and if there are remaining bytes after that point, shift those bytes to the start of buf and adjust bufNextByte accordingly)
int searchFrom = item.bufNextByte
item.bufNextByte += buf.arrayLength
if (searchFrom < 4)
searchFrom = 0
else
searchFrom = searchFrom - 4
//out.println("[NWID $(item.workID)] buf now: '$(item.buf)'")
int pos = 0
if ((pos = stringUtil.find(item.buf, "\r\n\r\n", searchFrom)) != StringUtil.NOT_FOUND)
{
cmd = parseCommand(item.buf, pos + 4)
if (item.bufNextByte > (pos + 5))
{
//copy-down
byte tmp[] = dana.sub(item.buf, pos + 4, item.bufNextByte)
item.buf =[] tmp
item.bufNextByte = item.bufNextByte - (pos + 4)
}
else
{
item.bufNextByte = 0
}
//check for a payload
char clength[] = null
if ((clength = getHeaderValue(cmd, "content-length")) != null)
{
item.payloadPending = intUtil.intFromString(clength)
//check if this payload size is <= some max payload value
if (item.payloadPending > MAX_PAYLOAD)
{
return RECV_CONNECTION_CLOSE
}
if (item.bufNextByte < item.payloadPending)
{
if (item.payloadPending > item.buf.arrayLength) expandBuffer(item.buf, item.payloadPending, BUF_SIZE_INCREMENT)
item.state = NetWorkItem.S_WAIT_PAYLOAD
}
else
{
requestReady = true
}
}
else
{
item.payloadPending = 0
requestReady = true
}
}
}
else if (item.state == NetWorkItem.S_WAIT_PAYLOAD)
{
item.bufNextByte += buf.arrayLength
if (item.bufNextByte >= item.payloadPending)
{
//ready to process
cmd = item.nextCommand
requestReady = true
}
}
}
else
{
//connection-close?
if (!item.client.connected())
return RECV_CONNECTION_CLOSE
}
if (requestReady)
{
ReqWorkerThread worker
//here we make sure that all requests from the same network socket are queued up on the same request worker thread, to make sure they're processed sequentially
mutex(item)
{
if (item.curReqThread == null)
worker = getReqWorker()
else
worker = item.curReqThread
WorkItem nwi = new WorkItem(item.client, item.tlsClient, item.stream, item.cfg.landingPages, item.cfg)
nwi.nextCommand = cmd
if (item.payloadPending != 0) nwi.payload = dana.sub(item.buf, 0, item.payloadPending)
nwi.fromNetItem = item
mutex(worker)
{
nwi.threadID = worker.threadID
if (worker.lastWork == null)
worker.work = nwi
else
worker.lastWork.next = nwi
worker.lastWork = nwi
worker.workCount ++
}
item.curReqThread = worker
item.reqWorkCount ++
}
worker.thread.signal()
}
return RECV_CONNECTION_CONTINUE
}
bool closeConnection(NetWorkerThread state, NetWorkItem item, bool immediate)
{
if (item.tlsClient != null)
{
item.state = NetWorkItem.S_TLS_CLOSING
int status = item.tlsClient.close()
if (!immediate)
{
if (status == TLS.OK)
{
item.client.disconnect()
item.state = NetWorkItem.S_CLOSED
//out.println("SSL_CLOSED")
return true
}
else if (status == TLS.WAIT_WRITE)
{
state.monitor.armSendNotify(item.client)
//out.println("SSL_CLOSE_WAIT_WRITE")
}
}
else
{
item.client.disconnect()
item.state = NetWorkItem.S_CLOSED
return true
}
return false
}
else
{
item.client.disconnect()
item.state = NetWorkItem.S_CLOSED
}
return true
}
void closeItem(NetWorkerThread state, NetWorkItem item)
{
//remove the item from our work list
mutex(state)
{
//out.println("rm-socket $(item.client.getID())")
state.monitor.remSocket(item.client)
item.curReqThread = null
//if it's already at the end, do nothing
if (item === state.workEnd)
{
if (state.work === state.workEnd)
{
state.work = null
state.workEnd = null
}
else
{
state.workEnd = state.workEnd.prev
state.workEnd.next = null
}
}
else if (item === state.work)
{
//if it's at the start, some special casing
state.work.next.prev = null
state.work = state.work.next
}
else
{
//it's something in the middle
item.next.prev = item.prev
item.prev.next = item.next
}
item.next = null
item.prev = null
}
}
void moveWorkToEnd(NetWorkerThread state, NetWorkItem item)
{
mutex(state)
{
//if it's already at the end, do nothing
if (item === state.workEnd)
{
return
}
//if it's at the start, some special casing
if (item === state.work)
{
state.work.next.prev = null
state.work = state.work.next
}
else
{
//it's something in the middle
item.next.prev = item.prev
item.prev.next = item.next
}
//put it at the end
item.next = null
item.prev = null
state.workEnd.next = item
item.prev = state.workEnd
state.workEnd = item
}
}
void pruneConnections(NetWorkerThread state, int now)
{
//look at the queue of work items, and see if any of them have been inactive for too long and should be culled/closed
mutex (state)
{
//keep checking the first item in the list, and remove and close that connection if the last-active time is too old
// -- we need awareness here that the current time might be before the lastActiveTime, in the case of clock-wraps; in this case we need to just overwrite the lastActiveTime with the new version and return to it next time (?)
while (state.work != null)
{
if (state.work.lastActiveTime > now)
{
state.work.lastActiveTime = now
return
}
//here we check if a net item has no actual work pending (meaning it's received a request and is busy sending the response, so we don't expect any further inbound socket activity until the response has been fully sent)
if (state.work.reqWorkCount == 0 && (state.work.evict || ((now - state.work.lastActiveTime) > MAX_INACTIVE)))
{
if (debugConnections) out.println(" -- [NWID $(state.work.workID)] connection-evict")
//out.println("EVICT $(state.work.workID)")
if (state.work.state == NetWorkItem.S_CLOSED || closeConnection(state, state.work, false))
{
closeItem(state, state.work)
}
else
{
return
}
}
else
{
return
}
}
}
}
void netWorkerThread(NetWorkerThread state)
{
while (true)
{
MonitorEvent events[] = state.monitor.wait(MONITOR_WAIT_TIME)
//out.println("socket/time event")
int now = cal.getMS()
for (int i = 0; i < events.arrayLength; i++)
{
NetWorkItem item = events[i].userData
bool closed = false
//update the last-active time on this item
item.lastActiveTime = now
//move the work item to the end of our list, so we end up with a sorted list of least-recently-active at the start
moveWorkToEnd(state, item)
if (events[i].recvReady && item.state != NetWorkItem.S_CLOSED)
{
if (item.state == NetWorkItem.S_TLS_ACCEPTING)
{
int status = item.tlsClient.accept(item.client)
if (status == TLS.OK)
{
item.state = NetWorkItem.S_WAIT_REQUEST
//out.println(" (SSL ACCEPT_OK-l)")
}
else if (status == TLS.WAIT_WRITE)
{
state.monitor.armSendNotify(item.client)
//out.println(" (SSL ACCEPT_WW-l)")
}
}
else if (item.state == NetWorkItem.S_TLS_CLOSING)
{
int status = item.tlsClient.close()
if (status == TLS.OK)
{
item.state = NetWorkItem.S_WAIT_REQUEST
item.client.disconnect()
closeItem(state, events[i].userData)
//out.println(" (SSL CLOSE_OK-l)")
}
else if (status == TLS.WAIT_WRITE)
{
state.monitor.armSendNotify(item.client)
//out.println(" (SSL CLOSE_WW-l)")
}
}
else if (!item.evict)
{
if (debugConnections) out.println("[NWID $(item.workID)] client socket receive-ready")
int status = receiveBytes(events[i].userData)
if (status == RECV_CONNECTION_CLOSE)
{
if (debugConnections) out.println(" -- [NWID $(item.workID)] recv requests connection-close")
if (closeConnection(state, events[i].userData, false))
{
closeItem(state, events[i].userData)
closed = true
}
}
}
}
if (events[i].sendReady)
{
if (item.state == NetWorkItem.S_TLS_ACCEPTING)
{
int status = item.tlsClient.accept(item.client)
if (status == TLS.OK)
{
item.state = NetWorkItem.S_WAIT_REQUEST
//out.println(" (SSL ACCEPT_OK-lw)")
}
else if (status == TLS.WAIT_WRITE)
{
state.monitor.armSendNotify(item.client)
//out.println(" (SSL ACCEPT_WW-lw)")
}
}
else if (item.state == NetWorkItem.S_TLS_CLOSING)
{
int status = item.tlsClient.close()
if (status == TLS.OK)
{
item.state = NetWorkItem.S_WAIT_REQUEST
item.client.disconnect()
closeItem(state, events[i].userData)
//out.println(" (SSL CLOSE_OK-lw)")
}
else if (status == TLS.WAIT_WRITE)
{
state.monitor.armSendNotify(item.client)
//out.println(" (SSL CLOSE_WW-lw)")
}
}
else
{
if (debugConnections) out.println(" -- [NWID $(item.workID)] client socket send-ready")
if (item.tlsClient != null)
item.tlsClient.sendBuffer()
else
events[i].socket.sendBuffer()
}
}
if (events[i].close && !closed)
{
//this is a TCP-level socket closed event, meaning the client has killed the TCP connection
// - there seems to be a case here where TLS will permanently say "want read" on shutdown, so we pass a flag to closeConnection() to say "close TCP immediately and stop" in the TLS case
if (debugConnections) out.println(" -- [NWID $(item.workID)] connection-close event")
if (closeConnection(state, events[i].userData, true))
{
closeItem(state, events[i].userData)
}
}
}
pruneConnections(state, now)
if (state.workCount == 0 && shutdown)
{
return
}
}
}
ReqWorkerThread getReqWorker()
{
int smallestIndex = 0
mutex(workerLock)
{
for (int i = 1; i < reqThreadPool.arrayLength; i++)
{
if (reqThreadPool[i].workCount < reqThreadPool[smallestIndex].workCount)
smallestIndex = i
}
}
return reqThreadPool[smallestIndex]
}
void RequestHandler:processStream(store TCPSocket s, store TLSContext tlsCtx, store ConfigData cfg)
{
TLS tls
Stream stream = s
//out.println("NEW connection")
if (tlsCtx != null)
{
tls = new TLS(tlsCtx)
stream = tls
//out.println(" (SSL)")
}
NetWorkItem nwi = new NetWorkItem(s, tls, stream, cfg)
nwi.buf = new byte[BUF_SIZE_INCREMENT]
nwi.workID = nextWorkID
nwi.lastActiveTime = cal.getMS()
nextWorkID ++
//we currently only ever use the first net thread...
int workerIndex = 0
NetWorkerThread thread = netThreadPool[workerIndex]
mutex(thread)
{
s.setNonBlocking(new NetThreadInfo(workerIndex))
//if "tls" is true we need to call tls.accept(), record the TLS state machine within nwi, and only sinkevent on tls
if (tls == null)
{
sinkevent NetEvents(s)
}
else
{
tls.setNonBlocking(new NetThreadInfo(workerIndex))
nwi.state = NetWorkItem.S_TLS_ACCEPTING
sinkevent NetEventsTLS(tls)
}
if (thread.work == null)
{
thread.work = nwi
}
else
{
thread.workEnd.next = nwi
nwi.prev = thread.workEnd
}
thread.workEnd = nwi
thread.monitor.addSocket(s, nwi)
if (tls != null)
{
int status = tls.accept(s)
if (status == TLS.OK)
{
nwi.state = NetWorkItem.S_WAIT_REQUEST
//out.println(" (SSL ACCEPT_OK)")
}
else if (status == TLS.WAIT_WRITE)
{
thread.monitor.armSendNotify(s)
//out.println(" (SSL ACCEPT_WW)")
}
}
if (debugConnections) out.println("client socket added as [NWID $(nwi.workID)]")
}
}
void Destructor:destroy()
{
shutdown = true
for (int i = 0; i < reqThreadPool.arrayLength; i++)
{
reqThreadPool[i].thread.signal()
reqThreadPool[i].thread.join()
}
for (int i = 0; i < netThreadPool.arrayLength; i++)
{
netThreadPool[i].thread.join()
while (netThreadPool[i].work != null)
{
closeItem(netThreadPool[i], netThreadPool[i].work)
}
}
reqThreadPool = null
netThreadPool = null
}
}