lights FAQ Forum github.com/luapower/sock
This package
sock
libtls

Networking
sock
connpool
resolver
http
uri
rsync
libtls
libtls_bearssl
bearssl

sock

Portable socket API with IOCP, epoll and kqueue for LuaJIT


local sock = require'sock'

Portable coroutine-based async socket API. For scheduling it uses IOCP on Windows, epoll on Linux and kqueue on OSX.

Rationale

Replace LuaSocket which doesn’t scale being select()-based, and improve on other aspects too (single file, nothing to compile, use cdata buffers instead of strings, don’t bundle unrelated modules, coro-based async only, multi-threading support).

Status

Windows & Linux only.

API

address lookup
sock.addr(...) -> ai look-up a hostname
ai:free() free the address list
ai:next() -> ai|nil get next address in list
ai:addrs() -> iter() -> ai iterate addresses
ai:type() -> s socket type: ‘tcp’, …
ai:family() -> s address family: ‘inet’, …
ai:protocol() -> s protocol: ‘tcp’, ‘icmp’, …
ai:name() -> s cannonical name
ai:tostring() -> s formatted address
ai.addr -> sa address object
sa:family() -> s address family: ‘inet’, …
sa:port() -> n address port
sa:tostring() -> s ‘ip:port’
sa:addr() -> ip IP address object
ip:tobinary() -> uint8_t[4|16], 4|16 IP address in binary form
ip:tostring() -> s IP address in string form
sockets
sock.tcp([family][, protocol]) -> tcp make a TCP socket
sock.udp([family][, protocol]) -> udp make a UDP socket
sock.raw([family][, protocol]) -> raw make a raw socket
s:type() -> s socket type: ‘tcp’, …
s:family() -> s address family: ‘inet’, …
s:protocol() -> s protocol: ‘tcp’, ‘icmp’, …
s:close() send FIN and/or RST and free socket
s:bind([host], [port], [af]) bind socket to an address
s:setopt(opt, val) set socket option ('so_*' or 'tcp_*')
s:getopt(opt) -> val get socket option
tcp|udp:connect(host, port, [expires], [af], ...) connect to an address
tcp:send(s|buf, [len], [expires]) -> true send bytes to connected address
udp:send(s|buf, [len], [expires]) -> len send bytes to connected address
tcp|udp:recv(buf, maxlen, [expires]) -> len receive bytes
tcp:listen([backlog, ]host, port, [af]) put socket in listening mode
tcp:accept([expires]) -> ctcp accept a client connection
tcp:recvn(buf, len, [expires]) -> buf, len receive n bytes
tcp:recvall() -> buf, len receive until closed
tcp:recvall_read() -> read make a buffered read function
udp:sendto(host, port, s|buf, [len], [expires], [af]) -> len send a datagram to an address
udp:recvnext(buf, maxlen, [expires], [flags]) -> len, sa receive the next datagram
tcp:shutdown('r'|'w'|'rw', [expires]) send FIN
scheduling
sock.newthread(func[, name]) -> co create a coroutine for async I/O
sock.resume(thread, ...) -> ... resume thread
sock.yield(...) -> ... safe yield (see coro)
sock.suspend(...) -> ... suspend thread
sock.thread(func, ...) -> co create thread and resume
sock.cowrap(f) -> wrapper see coro.safewrap()
sock.currentthread() -> co see coro.running()
sock.transfer(co, ...) -> ... see coro.transfer()
sock.onthreadfinish(co, f) run f when thread finishes
sock.threadenv[thread] <-> env get/set thread environment
sock.poll() poll for I/O
sock.start() keep polling until all threads finish
sock.stop() stop polling
sock.run(f, ...) -> ... run a function inside a sock thread
sock.sleep_until(t) sleep without blocking until sock.clock() value
sock.sleep(s) sleep without blocking for s seconds
sock.sleep_job() -> sj make an interruptible sleep job
sj:sleep_until(t) -> ... sleep until sock.clock()
sj:sleep(s) -> ... sleep for s seconds
sj:wakeup(...) wake up the sleeping thread
sock.runat(t, f) -> sjt run f at clock t
sock.runafter(s, f) -> sjt run f after s seconds
sock.runevery(s, f) -> sjt run f every s seconds
sjt:cancel() cancel timer
multi-threading
sock.iocp([iocp_h]) -> iocp_h get/set IOCP handle (Windows)
sock.epoll_fd([epfd]) -> epfd get/set epoll fd (Linux)

All function return nil, err on error (but raise on user error or unrecoverable OS failure). Some error messages are normalized across platforms, like ‘access_denied’ and ‘address_already_in_use’ so they can be used in conditionals.

I/O functions only work inside threads created with sock.newthread().

The optional expires arg controls the timeout of the operation and must be a sock.clock()-relative value (which is in seconds). If the expiration clock is reached before the operation completes, nil, 'timeout' is returned.

host, port args are passed to sock.addr() (with the optional af arg), which means that an already resolved address can be passed as ai, nil in place of host, port.

Address lookup

sock.addr(...) -> ai

Look-up a hostname. Returns an “address info” object which is a OS-allocated linked list of one or more addresses resolved with the system’s getaddrinfo(). The args can be either an existing ai object which is passed through, or:

  • host, port, [socket_type], [family], [protocol], [af]

where

  • host can be a hostname, ip address or '*' which means “all interfaces”.
  • port can be a port number, a service name or 0 which means “any available port”.
  • socket_type can be 'tcp', 'udp', 'raw' or 0 (the default, meaning “all”).
  • family can be 'inet', 'inet6' or 'unix' or 0 (the default, meaning “all”).
  • protocol can be 'ip', 'ipv6', 'tcp', 'udp', 'raw', 'icmp', 'igmp' or 'icmpv6' or 0 (the default is either 'tcp', 'udp' or 'raw', based on socket type).
  • af are a glue.bor() list of passive, cannonname, numerichost, numericserv, all, v4mapped, addrconfig which map to getaddrinfo() flags.

NOTE: getaddrinfo() is blocking. If that’s a problem, use resolver.

Sockets

sock.tcp([family][, protocol]) -> tcp

Make a TCP socket. The default family is 'inet'.

sock.udp([family][, protocol]) -> udp

Make an UDP socket. The default family is 'inet'.

sock.raw([family][, protocol]) -> raw

Make a raw socket. The default family is 'inet'.

s:close()

Close the connection and free the socket.

For TCP sockets, if 1) there’s unread incoming data (i.e. recv() hasn’t returned 0 yet), or 2) so_linger socket option was set with a zero timeout, then a TCP RST packet is sent to the client, otherwise a FIN is sent.

s:bind([host], [port], [af])

Bind socket to an interface/port (which default to ’*’ and 0 respectively meaning all interfaces and a random port).

tcp|udp:connect(host, port, [expires], [af])

Connect to an address, binding the socket to ('*', 0) if not bound already.

For UDP sockets, this has the effect of filtering incoming packets so that only those coming from the connected address get through the socket. Also, you can call connect() multiple times (use ('*', 0) to switch back to unfiltered mode).

tcp:send(s|buf, [len], [expires], [flags]) -> true

Send bytes to the connected address. Partial writes are signaled with nil, err, writelen. Trying to send zero bytes is allowed but it’s a no-op (doesn’t go to the OS).

udp:send(s|buf, [len], [expires], [flags]) -> len

Send bytes to the connected address. Empty packets (zero bytes) are allowed.

tcp|udp:recv(buf, maxlen, [expires], [flags]) -> len

Receive bytes from the connected address. With TCP, returning 0 means that the socket was closed on the other side. With UDP it just means that an empty packet was received.

tcp:listen([backlog, ]host, port, [af])

Put the socket in listening mode, binding the socket if not bound already (in which case host and port args are ignored). The backlog defaults to 1/0 which means “use the maximum allowed”.

tcp:accept([expires]) -> ctcp

Accept a client connection. The connection socket has additional fields: remote_addr, remote_port, local_addr, local_port.

tcp:recvn(buf, len, [expires]) -> buf, len

Repeat recv until len bytes are received. Partial reads are signaled with nil, err, readlen.

tcp:recvall() -> buf,len | nil,err,buf,len

Receive until closed into an accumulating buffer. If an error occurs before the socket is closed, the partial buffer and length is returned after it.

tcp:recvall_read() -> read

Receive all data into a buffer and make a read function that consumes it. Useful for APIs that require an input read function that cannot yield.

udp:sendto(host, port, s|buf, [maxlen], [expires], [flags], [af]) -> len

Send a datagram to a specific destination, regardless of whether the socket is connected or not.

udp:recvnext(buf, maxlen, [expires], [flags]) -> len, sa

Receive the next incoming datagram, wherever it came from, along with the source address. If the socket is connected, packets are still filtered though.

tcp:shutdown('r'|'w'|'rw')

Shutdown the socket for receiving, sending or both. Does not block.

Sends a TCP FIN packet to indicate refusal to send/receive any more data on the connection. The FIN packet is only sent after all the current pending data is sent (unlike RST which is sent immediately). When a FIN is received recv() returns 0.

Calling close() without shutdown may send a RST (see the notes on close() for when that can happen) which may cause any data that is pending either on the sender side or on the receiving side to be discarded (that’s how TCP works: RST has that data-cutting effect).

Required for lame protocols like HTTP with pipelining: a HTTP server that wants to close the connection before honoring all the received pipelined requests needs to call s:shutdown'w' (which sends a FIN to the client) and then continue to receive (and discard) everything until a recv that returns 0 comes in (which is a FIN from the client, as a reply to the FIN from the server) and only then it can close the connection without messing up the client.

Scheduling

Scheduling is based on synchronous coroutines provided by coro which allows coroutine-based iterators that perform socket I/O to be written.

sock.newthread(func) -> co

Create a coroutine for performing async I/O. The coroutine must be resumed to start. When the coroutine finishes, the control is transfered to the I/O thread (the thread that called start()).

Full-duplex I/O on a socket can be achieved by performing reads in one thread and writes in another.

sock.resume(thread, ...)

Resume a thread, which means transfer control to it, but also temporarily change the I/O thread to be this thread so that the first suspending call (send, recv, sleep, suspend, etc.) gives control back to this thread. This is the trick to starting multiple threads before starting polling.

sock.suspend(...) -> ...

Suspend current thread, transfering to the polling thread (but also see resume()).

sock.poll(timeout) -> true | false,'timeout'

Poll for the next I/O event and resume the coroutine that waits for it.

Timeout is in seconds with anything beyond 2^31-1 taken as infinte and defaults to infinite.

sock.start(timeout)

Start polling. Stops after the timeout expires and there’s no more I/O or stop() was called.

sock.stop()

Tell the loop to stop dequeuing and return.

sock.sleep_until(t)

Sleep until a time.clock() value without blocking other threads.

sock.sleep(s)

Sleep s seconds without blocking other threads.

sock.sleep_job() -> sj

Make an interruptible sleeping job. Put the current thread sleep using sj:sleep() or sj:sleep_until() and then from another thread call sj:wakeup() to resume the sleeping thread. Any arguments passed to wakeup() will be returned by sleep().

Multi-threading

sock.iocp([iocp_handle]) -> iocp_handle

Get/set the global IOCP handle (Windows).

IOCPs can be shared between OS threads and having a single IOCP for all threads (as opposed to having one IOCP per thread/Lua state) enables the kernel to better distribute the completion events between threads.

To share the IOCP with another Lua state running on a different thread, get the IOCP handle with sock.iocp(), copy it over to the other state, then set it with sock.iocp(copied_iocp).

sock.epoll_fd([epfd]) -> epfd

Get/set the global epoll fd (Linux).

Epoll fds can be shared between OS threads and having a single epfd for all threads is more efficient for the kernel than having one epfd per thread.

To share the epfd with another Lua state running on a different thread, get the epfd with sock.epoll_fd(), copy it over to the other state, then set it with sock.epoll_fd(copied_epfd).


Last updated: 22 months ago | Edit on GitHub

Package:sock
Pkg type:Lua+ffi
Version: 10c356c
Last commit:
Author: Cosmin Apreutesei
License: Public Domain

Requires: coro  glue  heap  libtls  luajit  time  winapi 

Required by: connpool  dollar  webb  x-widgets 


Top