Socket 的集合

A 'hello' enters a tin can and exits from a bucket

到目前為止,我們在處理 Erlang 本身時玩得很開心,幾乎沒有與外界溝通,只是透過我們這裡讀取的一些文字檔案。儘管與自己建立關係可能很有趣,但現在是時候走出我們的巢穴,開始與世界其他地方對話了。

本章將涵蓋使用 socket 的三個組成部分:IO list、UDP socket 和 TCP socket。IO list 作為一個主題來說並不是很複雜。它們只是一種巧妙的方式,可以有效地建立要透過 socket 和其他 Erlang 驅動程式發送的字串。

IO List

我之前在本指南中提到,對於文字,我們可以使用字串(整數的列表)或二進制(一個儲存資料的二進制資料結構)。透過線路傳輸諸如「Hello World」之類的東西,可以將其作為字串 "Hello World",並作為二進制 <<"Hello World">>。符號相似,結果相似。

區別在於您如何組裝東西。字串有點像整數的鍊式列表:對於每個字元,您都必須儲存字元本身以及指向列表其餘部分的連結。此外,如果您想在列表的中間或結尾添加元素,則必須遍歷整個列表,直到您要修改的位置,然後再添加元素。但是,在 prepend 時情況並非如此

A = [a]
B = [b|A] = [b,a]
C = [c|B] = [c,b,a]

在 prepend 的情況下,如上所示,無論 ABC 中保存的內容都不需要重寫。C 的表示形式可以看作 [c,b,a][c|B][c,|[b|[a]]] 等。在最後一種情況下,您可以看到 A 的形狀在列表結尾處與宣告時相同。B 也是如此。以下是 append 的情況

A = [a]
B = A ++ [b] = [a] ++ [b] = [a|[b]]
C = B ++ [c] = [a|[b]] ++ [c] = [a|[b|[c]]]

您看到所有重寫了嗎?當我們建立 B 時,我們必須重寫 A。當我們寫入 C 時,我們必須重寫 B(包括它包含的 [a|...] 部分)。如果我們以類似的方式添加 D,我們將需要重寫 C。對於長字串,這變得太沒有效率了,並且會產生大量垃圾,需要由 Erlang VM 清理。

對於二進制,情況並沒有那麼糟糕

A = <<"a">>
B = <<A/binary, "b">> = <<"ab">>
C = <<B/binary, "c">> = <<"abc">>

在這種情況下,二進制知道它們自己的長度,並且可以在恆定時間內將資料聯結。這很好,比列表好得多。它們也更緊湊。由於這些原因,我們在未來使用文字時,通常會盡量堅持使用二進制。

但是,有一些缺點。二進制旨在以某些方式處理事物,並且修改二進制、分割二進制等仍然需要成本。此外,有時我們會使用可互換使用字串、二進制和個別字元的程式碼。不斷在類型之間轉換將會很麻煩。

在這些情況下,IO list 是我們的救星。IO list 是一種奇怪的資料結構類型。它們是位元組(0 到 255 的整數)、二進制或其他 IO list 的列表。這表示接受 IO list 的函數可以接受諸如 [$H, $e, [$l, <<"lo">>, " "], [[["W","o"], <<"rl">>]] | [<<"d">>]] 之類的項目。當發生這種情況時,Erlang VM 會在需要時扁平化列表,以取得字元序列 Hello World

哪些函數接受這種 IO List?大多數與輸出資料有關的函數都會接受。來自 io 模組、file 模組的任何函數,以及 TCP 和 UDP socket 都可以處理它們。一些函式庫函數,例如來自 unicode 模組的一些函數和來自 re(用於正則表達式)模組的所有函數也會處理它們,僅舉幾個例子。

在 shell 中使用 io:format("~s~n", [IoList]) 嘗試先前的 Hello World IO List,看看會發生什麼。它應該可以正常運作。

A guido with an RJ-45 connection head

總而言之,當動態建立要輸出的內容時,它們是一種非常聰明的方法,可以建立字串以避免不可變資料結構的問題。

TCP 和 UDP:兄弟協議

我們可以在 Erlang 中使用的第一種 socket 基於 UDP 協定。UDP 是一種建立在 IP 層之上的協定,它在 IP 層之上提供了一些抽象,例如連接埠號碼。UDP 被稱為無狀態協定。從 UDP 連接埠接收的資料被分成小部分、未標記、沒有會話,並且無法保證您接收到的片段與您接收它們的順序相同。事實上,無法保證如果有人傳送封包,您就會收到它。由於這些原因,當封包很小,有時遺失不會產生太大後果,當沒有發生太多複雜的交換,或者絕對需要低延遲時,人們傾向於使用 UDP。

這與 TCP 等有狀態協定相反,其中協定負責處理遺失的封包、重新排序封包、維護多個傳送者和接收者之間的隔離會話等。TCP 將允許可靠的資訊交換,但風險是設定速度較慢且較重。UDP 將很快,但不太可靠。根據您的需求仔細選擇。

無論如何,在 Erlang 中使用 UDP 相對簡單。我們在給定的連接埠上設定一個 socket,並且該 socket 可以傳送和接收資料

Diagram showing a Host A that has ports A, B and C, which can all send and receive packets to other hosts

對於一個糟糕的類比,這就像在你的房子裡有一堆信箱(每個信箱都是一個連接埠),並且在每個信箱中收到帶有小訊息的小紙條。它們可以有任何內容,從「我喜歡你穿這件褲子的樣子」到「紙條來自房子裡面!」當某些訊息對於一張紙條來說太大時,就會有很多紙條被丟到信箱裡。您的工作是以有意義的方式重新組裝它們,然後開車到某個房子,然後將紙條作為回覆丟進去。如果訊息純粹是資訊性訊息(「嘿,你的門沒鎖」)或非常小的訊息(「你穿什麼? -Ron」),那應該沒問題,你可以將一個信箱用於所有查詢。但是,如果它們很複雜,我們可能希望每個會話使用一個連接埠,對嗎?呃,不!使用 TCP!

就 TCP 而言,該協定被稱為有狀態、基於連線的協定。在能夠傳送訊息之前,您必須進行交握。這表示有人正在接管一個信箱(類似於我們在 UDP 類比中的情況),並傳送一條訊息說「嘿,老兄,這是 IP 94.25.12.37 呼叫。想聊天嗎?」,您會回覆類似「當然。以數字 N 標記您的訊息,然後在其中添加遞增的數字」。從那時起,當您或 IP 92.25.12.37 想要彼此通訊時,就可以以有意義的方式訂購紙條、要求遺失的紙條、回覆它們等等。

這樣,我們可以使用一個信箱(或連接埠)並保持我們所有的通訊良好。這就是 TCP 的優點。它增加了一些額外負擔,但確保一切都井然有序、正確交付等等。

如果您不喜歡這些類比,請不要絕望,因為我們現在將直接了解如何使用 Erlang 的 TCP 和 UDP socket。這應該會更簡單。

UDP Socket

UDP 只有幾個基本操作:設定 socket、傳送訊息、接收訊息和關閉連線。可能性有點像這樣

A graph showing that Opening a socket can lead to 3 options: sending data, receiving data, or closing a socket. Sending can lead to receiving data or closing a socket, receiving data can lead to sending data or closing a socket. Finally, closing a socket does nothing

無論如何,第一個操作是開啟 socket。這是透過呼叫 gen_udp:open/1-2 來完成的。最簡單的形式是透過呼叫 {ok, Socket} = gen_udp:open(PortNumber) 來完成的。

連接埠號碼將是 1 到 65535 之間的任何整數。從 0 到 1023,這些連接埠被稱為系統連接埠。大多數情況下,除非您擁有管理權限,否則您的作業系統會讓您無法監聽系統連接埠。從 1024 到 49151 的連接埠是已註冊的連接埠。它們通常不需要任何權限並且可以免費使用,儘管其中一些連接埠 已註冊到眾所周知的服務。然後,其餘的連接埠被稱為動態私有連接埠。它們經常被用於臨時連接埠。對於我們的測試,我們將採用一些相對安全的連接埠號碼,例如 8789,不太可能被佔用。

但在那之前,gen_udp:open/2 怎麼樣?第二個參數可以是選項的列表,指定我們要接收資料的類型(listbinary),我們希望如何接收它們;作為訊息 ({active, true}) 或作為函數呼叫的結果 ({active, false})。還有更多選項,例如是否應將 socket 設定為 IPv4 (inet4) 或 IPv6 (inet6)、是否可以使用 UDP socket 廣播資訊 ({broadcast, true | false})、緩衝區的大小等。還有更多可用的選項,但我們現在將堅持簡單的東西,因為理解其餘的內容取決於您自己學習。這個主題很快就會變得複雜,而本指南是關於 Erlang,而不是 TCP 和 UDP,很遺憾。

因此,讓我們開啟一個 socket。首先啟動一個給定的 Erlang shell

1> {ok, Socket} = gen_udp:open(8789, [binary, {active,true}]). 
{ok,#Port<0.676>}
2> gen_udp:open(8789, [binary, {active,true}]).
{error,eaddrinuse}

在第一個命令中,我開啟 socket,命令它傳回我二進制資料,並且我希望它是活動的。您可以看到傳回了一個新的資料結構:#Port<0.676>。這是我們剛剛開啟的 socket 的表示形式。它們可以像 Pids 一樣使用:您甚至可以設定它們的連結,以便在發生崩潰時將故障傳播到 socket!第二個函數呼叫嘗試再次開啟相同的 socket,這是不可行的。這就是傳回 {error, eaddrinuse} 的原因。幸運的是,第一個 Socket socket 仍然開啟。

無論如何,我們將啟動第二個 Erlang shell。在該 shell 中,我們將使用不同的連接埠號碼開啟第二個 UDP socket

1> {ok, Socket} = gen_udp:open(8790).
{ok,#Port<0.587>}
2> gen_udp:send(Socket, {127,0,0,1}, 8789, "hey there!").
ok

啊,一個新功能!在第二次呼叫時,會使用 gen_udp:send/4 來傳送訊息(真是個描述性十足的名稱)。參數依序為:gen_udp:send(OwnSocket, RemoteAddress, RemotePort, Message)RemoteAddress 可以是字串或包含網域名稱("example.org")的原子,或是描述 IPv4 位址的 4 元組,或是描述 IPv6 位址的 8 元組。然後我們指定接收者的埠號(我們要將紙條丟進哪個信箱?),然後是訊息,訊息可以是字串、二進位或 IO 列表。

訊息真的有被傳送出去嗎?回到你的第一個 shell 並嘗試刷新資料

3> flush().
Shell got {udp,#Port<0.676>,{127,0,0,1},8790,<<"hey there!">>}
ok

太棒了。開啟 socket 的程序會接收到形式為 {udp, Socket, FromIp, FromPort, Message} 的訊息。透過這些欄位,我們就能知道訊息來自何處、經過哪個 socket 以及內容為何。所以我們已經涵蓋了開啟 socket、傳送資料以及在主動模式下接收資料。那被動模式呢?為此,我們需要從第一個 shell 關閉 socket 並開啟一個新的 socket。

4> gen_udp:close(Socket).
ok
5> f(Socket).
ok
6> {ok, Socket} = gen_udp:open(8789, [binary, {active,false}]).
{ok,#Port<0.683>}

所以這裡,我們關閉 socket,解除綁定 Socket 變數,然後在再次開啟 socket 時將其綁定,這次是處於被動模式。在回傳訊息之前,請嘗試以下操作

7> gen_udp:recv(Socket, 0).

你的 shell 應該會卡住。這裡的函式是 recv/2。這個函式用來輪詢被動 socket 以接收訊息。這裡的 0 是我們想要的訊息長度。有趣的是,長度在 gen_udp 中完全被忽略。gen_tcp 有一個類似的函式,在這種情況下,它確實會產生影響。無論如何,如果我們從未傳送訊息,recv/2 永遠不會返回。回到第二個 shell 並傳送一個新訊息

3> gen_udp:send(Socket, {127,0,0,1}, 8789, "hey there!").
ok

然後第一個 shell 應該會印出 {ok,{{127,0,0,1},8790,<<"hey there!">>}} 作為返回值。如果你不想永遠等待怎麼辦?只要加上一個逾時值即可

8> gen_udp:recv(Socket, 0, 2000).
{error,timeout}

這就是 UDP 的大部分內容。真的!

TCP Sockets

雖然 TCP socket 與 UDP socket 有很大一部分介面是相同的,但在它們的運作方式上有一些重要的差異。最大的差異在於用戶端和伺服器是兩個完全不同的東西。用戶端將會按照以下操作執行

A diagram similar to the UDP one: connection leads to send and receive, which both send to each other. More over, all states can then lead to the closed state

而伺服器則會遵循這個流程

Diagram similar to the UDP one, although a listen state is added before the whole thing. That state can either move on to the 'accept' state (similar to 'open socket' for the possible branches) or to a close state.

看起來很奇怪,對吧?用戶端的行為有點像我們在 gen_udp 中所做的那樣:你連線到一個埠,傳送和接收,然後停止這樣做。但是,在服務時,我們有一個新的模式:監聽。這是因為 TCP 設定會話的方式。

首先,我們開啟一個新的 shell 並使用 gen_tcp:listen(Port, Options) 啟動一個名為監聽 socket 的東西

1> {ok, ListenSocket} = gen_tcp:listen(8091, [{active,true}, binary]).
{ok,#Port<0.661>}

監聽 socket 僅負責等待連線請求。你可以看到我使用了與 gen_udp 類似的選項。這是因為大多數選項對於所有 IP socket 都是相似的。TCP 選項確實有一些更特定的選項,包括連線佇列({backlog, N})、保持連線 socket({keepalive, true | false})、封包封裝({packet, N},其中 N 是每個封包標頭要被剝離和剖析的長度)等等。

一旦監聽 socket 開啟,任何程序(而且可以有多個)都可以取得監聽 socket 並進入「接受」狀態,直到有用戶端請求與其通訊時才被鎖定

2> {ok, AcceptSocket} = gen_tcp:accept(ListenSocket, 2000).
** exception error: no match of right hand side value {error,timeout}
3> {ok, AcceptSocket} = gen_tcp:accept(ListenSocket).
** exception error: no match of right hand side value {error,closed}

糟糕。我們逾時然後崩潰了。當與其關聯的 shell 程序消失時,監聽 socket 就會關閉。讓我們重新開始,這次沒有 2 秒(2000 毫秒)的逾時時間

4> f().
ok
5> {ok, ListenSocket} = gen_tcp:listen(8091, [{active, true}, binary]).
{ok,#Port<0.728>}
6> {ok, AcceptSocket} = gen_tcp:accept(ListenSocket).

然後程序被鎖定了。太棒了!讓我們開啟第二個 shell

1> {ok, Socket} = gen_tcp:connect({127,0,0,1}, 8091, [binary, {active,true}]). 
{ok,#Port<0.596>}

這個 shell 仍然採用與往常相同的選項,如果你不想永遠等待,可以在最後一個位置新增 Timeout 引數。如果你回頭看第一個 shell,它應該會返回 {ok, SocketNumber}。從那時起,接受 socket 和用戶端 socket 就可以像 gen_udp 一樣,一對一地進行通訊。使用第二個 shell 向第一個 shell 傳送訊息

3> gen_tcp:send(Socket, "Hey there first shell!").
ok

然後從第一個 shell

7> flush().
Shell got {tcp,#Port<0.729>,<<"Hey there first shell!">>}
ok

兩個 socket 可以用相同的方式傳送訊息,然後可以用 gen_tcp:close(Socket) 關閉。請注意,關閉接受 socket 只會關閉該 socket,而關閉監聽 socket 不會關閉任何相關且已建立的接受 socket,但會透過返回 {error, closed} 來中斷目前正在執行的 accept 呼叫。

這就是 Erlang 中大多數 TCP socket 的內容!但真的是這樣嗎?

啊,是的,當然,還有更多可以做的事情。如果你自己做了一些 socket 實驗,你可能會注意到 socket 有某種所有權。

我的意思是說,UDP socket、TCP 用戶端 socket 和 TCP 接受 socket 都可以透過任何現有的程序傳送訊息,但接收到的訊息只能由啟動 socket 的程序讀取

A diagram that shows that all processes can send to a socket, but only the owner can receive messages

這不是很實用,對吧?這表示我們必須始終保持所有者程序處於活動狀態才能轉發訊息,即使它與我們的需求無關。如果能夠做到這樣不是很好嗎?

    1.  Process A starts a socket
    2.  Process A sends a request
    3.  Process A spawns process B
        with a socket
    4a. Gives ownership of the      4b. Process B handles the request
        socket to Process B
    5a. Process A sends a request   5b. Process B Keeps handling
                                        the request
    6a. Process A spawns process C  6b. ...
        with a socket
        ...

在這裡,A 將負責執行一堆查詢,但每個新程序將負責等待回覆、處理回覆等等。因此,A 委派一個新程序來執行任務會很聰明。這裡的棘手部分是放棄 socket 的所有權。

這是訣竅。gen_tcp 和 gen_udp 都包含一個名為 controlling_process(Socket, Pid) 的函式。此函式必須由目前 socket 所有者呼叫。然後程序會告訴 Erlang「你知道嗎?就讓這個 Pid 傢伙接管我的 socket 吧。我放棄」。從現在開始,函式中的 Pid 就可以讀取和接收來自 socket 的訊息。就是這樣。

更多使用 Inet 的控制

所以現在我們了解如何開啟 socket、透過 socket 傳送訊息、變更所有權等等。我們也知道如何在被動和主動模式下監聽訊息。回到 UDP 範例中,當我想要從主動模式切換到被動模式時,我重新啟動了 socket、清除了變數然後繼續。這相當不實用,尤其是在我們想要在使用 TCP 時執行相同操作時,因為我們必須中斷活動的會話。

幸運的是,有一個名為 inet 的模組,負責處理 gen_tcp 和 gen_udp socket 都通用的所有操作。對於我們手邊的問題,也就是在主動和被動模式之間變更,有一個名為 inet:setopts(Socket, Options) 的函式。選項清單可以包含在設定 socket 時使用的任何詞彙。

注意:請小心!有一個名為 inet 的模組和一個名為 inets 的模組。inet 是我們在這裡想要的模組。inets 是一個 OTP 應用程式,其中包含一堆預先寫好的服務和伺服器(包括 FTP、簡易 FTP (TFTP)、HTTP 等)。

區分它們的一個簡單技巧是,inets 是關於建立在 inet 之上的services,或者如果你喜歡,inet + s(ervices)。

啟動一個 shell 作為 TCP 伺服器

1> {ok, Listen} = gen_tcp:listen(8088, [{active,false}]).
{ok,#Port<0.597>}
2> {ok, Accept} = gen_tcp:accept(Listen).

在第二個 shell 中

1> {ok, Socket} = gen_tcp:connect({127,0,0,1}, 8088, []).
{ok,#Port<0.596>}
2> gen_tcp:send(Socket, "hey there").
ok

然後回到第一個 shell,應該已接受該 socket。我們刷新以查看是否有任何內容

3> flush().
ok

當然沒有,我們處於被動模式。讓我們修正這個問題

4> inet:setopts(Accept, [{active, true}]).
ok
5> flush().
Shell got {tcp,#Port<0.598>,"hey there"}
ok

是!透過完全控制主動和被動 socket,力量掌握在我們手中。我們如何選擇主動和被動模式?

A stop sign

好吧,有很多重點。一般而言,如果你正在立即等待訊息,被動模式會快得多。Erlang 不必為了處理事情而擺弄你程序的信箱,你也不必掃描該信箱、提取訊息等等。使用 recv 會更有效率。但是,recv 會將你的程序從事件驅動變更為主動輪詢 — 如果你必須在 socket 和其他 Erlang 程式碼之間扮演中間人的角色,這可能會使事情變得有點複雜。

在這種情況下,切換到主動模式會是個好主意。如果封包以訊息的形式傳送,你只需在 receive(或 gen_server 的 handle_info 函式)中等待並處理訊息即可。除了速度之外,這項操作的缺點與速率限制有關。

概念是,如果所有來自外部世界的封包都被 Erlang 盲目接受,然後轉換為訊息,那麼外部世界的人就很容易可以透過封包洪流來癱瘓它。被動模式的優點是限制了訊息可以放入 Erlang VM 的方式和時間,並將封鎖、排隊和捨棄訊息的任務委派給較低層級的實作。

那麼如果我們需要主動模式的語義,但又需要被動模式的安全性怎麼辦?我們可以嘗試使用 inet:setopts/2 快速地在被動和主動之間切換,但這對競爭條件來說相當危險。相反地,有一種名為active once 的模式,選項為 {active, once}。讓我們嘗試看看它的運作方式。

保留先前伺服器的 shell

6> inet:setopts(Accept, [{active, once}]).
ok

現在進入用戶端 shell 並再執行兩個 send/2 呼叫

3> gen_tcp:send(Socket, "one").
ok
4> gen_tcp:send(Socket, "two").
ok

然後回到伺服器 shell

7> flush().
Shell got {tcp,#Port<0.598>,"one"}
ok
8> flush().
ok
9> inet:setopts(Accept, [{active, once}]).
ok
10> flush().
Shell got {tcp,#Port<0.598>,"two"}
ok

看到了嗎?在我們第二次要求 {active, once} 之前,訊息 "two" 尚未轉換為訊息,這表示 socket 回到了被動模式。因此,active once 模式允許我們以安全的方式在主動和被動之間來回切換。良好的語義,加上安全性。

inet 中還有其他不錯的函式。用於讀取統計資料、取得目前主機資訊、檢查 socket 等等的功能。

好了,這就是 socket 的大部分內容。現在是時候付諸實踐了。

注意:在網際網路的荒野中,你有許多函式庫可以透過大量的協定來做到這一點:HTTP、0mq、原始 unix socket 等等。它們全部都可用。但是,標準 Erlang 發行版隨附兩個主要選項,即 TCP 和 UDP socket。它還附帶一些 HTTP 伺服器和剖析程式碼,但它並不是最有效率的東西。

更新
從 17.0 版本開始,現在可以設定讓埠口在 N 個封包內保持啟用。TCP 和 UDP 埠口新增了 {active, N} 選項,其中 N 可以是 0 到 32767 之間的任何值。一旦剩餘訊息計數器達到 0,或是透過 inet:setopts/2 明確設定為 0,socket 就會轉換為被動 ({active, false}) 模式。屆時,會傳送訊息到 socket 的控制程序,通知它轉換的狀態。對於 TCP,訊息將會是 {tcp_passive, Socket},而對於 UDP,則會是 {udp_passive, Socket}

當多次呼叫此函式時,每個新值都會加到總計數器。呼叫 {active, 3} 三次,會使其最多傳送 9 個訊息到控制程序。 N 值也可以是負數,以強制遞減計數器。如果最終值低於 0,Erlang 會無聲地將其設定為 0 並轉換為被動模式。

A cup of coffee with cookies and a spoon. Text says 'take a break'

Sockserv,重新檢視

本章不會介紹太多新程式碼。相反地,我們會回顧上一章 Process Quest 中的 sockserv 伺服器。它是一個完全可行的伺服器,我們將會看到如何在 OTP 監管樹中的 gen_server 內處理 TCP 連線。

TCP 伺服器的簡單實作可能看起來像這樣

-module(naive_tcp).
-compile(export_all).

start_server(Port) ->
    Pid = spawn_link(fun() ->
        {ok, Listen} = gen_tcp:listen(Port, [binary, {active, false}]),
        spawn(fun() -> acceptor(Listen) end),
        timer:sleep(infinity)
    end),
    {ok, Pid}.

acceptor(ListenSocket) ->
    {ok, Socket} = gen_tcp:accept(ListenSocket),
    spawn(fun() -> acceptor(ListenSocket) end),
    handle(Socket).

%% Echoing back whatever was obtained
handle(Socket) ->
    inet:setopts(Socket, [{active, once}]),
    receive
        {tcp, Socket, <<"quit", _/binary>>} ->
            gen_tcp:close(Socket);
        {tcp, Socket, Msg} ->
            gen_tcp:send(Socket, Msg),
            handle(Socket)
    end.

為了理解它是如何運作的,以下圖示可能會有所幫助

A diagram showing the first process (P1) spawning a listen socket and a first acceptor process (P2). The first acceptor can accept request, handle messages, and then spawn a new acceptor process (P3) that does the same as P2

因此,start_server 函式會開啟一個監聽 socket,產生一個接受器,然後就永遠閒置。閒置是必要的,因為監聽 socket 會繫結到開啟它的程序,因此只要我們想處理連線,該程序就必須保持存活。每個接受器程序都會等待連線接受。一旦有連線進來,接受器程序就會啟動一個新的類似程序,並與其共享監聽 socket。然後,它可以繼續處理其他事情,而新的程序則在工作。每個處理器都會重複它收到的所有訊息,直到其中一個訊息以 "quit" 開頭 — 然後連線就會關閉。

注意: <<"quit", _/binary>> 模式表示我們首先要比對一個包含字元 quit 的二進位字串,加上一些我們不關心的二進位資料 (_)。

在 Erlang shell 中執行 naive_tcp:start_server(8091). 來啟動伺服器。然後開啟一個 telnet 用戶端(請記住,telnet 用戶端在技術上並非用於原始 TCP,但可以作為測試伺服器的良好用戶端,而無需編寫一個用戶端),您會看到以下情況發生

$ telnet localhost 8091
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hey there
hey there
that's what I asked
that's what I asked
stop repeating >:(
stop repeating >:(
quit doing that!
Connection closed by foreign host.

太棒了。現在可以成立一家名為Poople Inc.的新公司,並使用這樣的伺服器推出幾個社群網路。只是如同模組名稱所說,這是一個簡單的實作。程式碼很簡單,但並未考慮到平行處理。如果所有請求都一個接一個地進來,那麼這個簡單的伺服器可以正常運作。但是,如果同時有 15 個人想要連線到伺服器,會發生什麼事呢?

然後一次只能回覆一個查詢,這是因為每個程序都必須先等待連線、設定它,然後產生一個新的接受器。佇列中的第 15 個請求必須等待其他 14 個連線設定完成,才有機會要求與我們的伺服器進行交談。如果您使用的是生產伺服器,它可能更接近每秒 500 到 1000 個查詢。這是不可行的。

我們需要改變的是目前的循序工作流程

A diagram showing in order, a listen operation, then a bunch of 'accepts' coming one after the other in a chain

變成更平行的流程

A diagram showing in order, a listen operation, then a bunch of 'accepts' coming under the listen operation

透過讓許多接受器隨時待命,我們將可以減少回答新查詢的大量延遲。現在,我們將不深入探討另一個演示實作,而是研究上一章的 sockserv-1.0.1。探索基於實際 OTP 元件和真實世界實務的內容會更好。事實上,sockserv 的一般模式與 cowboy(儘管 cowboy 無疑比 sockserv 更可靠)和 etorrent torrent 用戶端等伺服器中使用的模式相同。

為了建立這個 Process Quest 的 sockserv,我們將從上而下進行。我們需要的架構必須是一個具有多個工作者的監管器。如果我們看看上面的平行圖,監管器應該持有監聽 socket 並將其共享給所有工作者,這些工作者將負責接受連線。

我們如何編寫一個可以跨所有工作者共享事物的監管器?使用常規監管是無法做到的:所有子程序都是完全獨立的,無論您使用 one_for_oneone_for_allrest_for_one 監管。一個自然的反應可能是轉向某些全域狀態:一個註冊的程序,它只持有監聽 socket 並將其傳遞給處理器。您必須對抗這種反應並變得聰明。使用原力(以及回頭閱讀監管器章節的能力)。您有 2 分鐘的時間思考解決方案(兩分鐘的計時是基於榮譽制度。自行計時。)

訣竅在於使用 simple_one_for_one 監管器。因為 simple_one_for_one 監管器會與其所有子程序共享子程序規格,我們需要做的就是將監聽 socket 推入其中,供所有子程序存取!

以下是監管器的完整程式碼

%%% The supervisor in charge of all the socket acceptors.
-module(sockserv_sup).
-behaviour(supervisor).

-export([start_link/0, start_socket/0]).
-export([init/1]).

start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
    {ok, Port} = application:get_env(port),
    %% Set the socket into {active_once} mode.
    %% See sockserv_serv comments for more details
    {ok, ListenSocket} = gen_tcp:listen(Port, [{active,once}, {packet,line}]),
    spawn_link(fun empty_listeners/0),
    {ok, {{simple_one_for_one, 60, 3600},
         [{socket,
          {sockserv_serv, start_link, [ListenSocket]}, % pass the socket!
          temporary, 1000, worker, [sockserv_serv]}
         ]}}.

start_socket() ->
    supervisor:start_child(?MODULE, []).

%% Start with 20 listeners so that many multiple connections can
%% be started at once, without serialization. In best circumstances,
%% a process would keep the count active at all times to insure nothing
%% bad happens over time when processes get killed too much.
empty_listeners() ->
    [start_socket() || _ <- lists:seq(1,20)],
    ok.

這裡發生了什麼事?標準的 start_link/0init/1 函式都在這裡。您可以看到 sockserv 取得 simple_one_for_one 的重新啟動策略,並且子程序規格傳遞了 ListenSocket。每個以 start_socket/0 啟動的子程序都會預設將其作為參數。神奇吧!

只有這樣是不夠的。我們希望應用程式能夠儘快為查詢提供服務。這就是我加入呼叫 spawn_link(fun empty_listeners/0) 的原因。 empty_listeners/0 函式將啟動 20 個處理器,這些處理器將被鎖定並等待傳入的連線。我將它放在 spawn_link/1 呼叫中,原因很簡單:監管器程序處於其 init/1 階段,無法回覆任何訊息。如果我們從 init 函式內呼叫自己,程序將會死結,永遠無法完成執行。為此,需要一個外部程序。

注意: 在上面的程式碼片段中,您會注意到我將選項 {packet, line} 傳遞給 gen_tcp。此選項會讓所有接收到的封包分成不同的行,並根據該行進行排隊(行尾仍將是接收到的字串的一部分)。這將有助於確保在我們的案例中,telnet 用戶端能更好地運作。但是請注意,長度超過接收緩衝區的行可能會被分割成許多封包,因此兩個封包有可能代表一行。驗證接收到的內容是否以換行符結尾,可以讓您知道該行是否結束。

所以,沒錯,那是最棘手的部分。現在我們可以專注於編寫工作者本身。

如果您還記得上一章的 Process Quest 會議,事情是這樣發生的

  1. 使用者連線到伺服器
  2. 伺服器要求角色的名稱
  3. 使用者傳送角色名稱
  4. 伺服器建議屬性
    1. 使用者拒絕,回到第 4 點
    2. 使用者接受,前往第 6 點
  5. 遊戲將事件傳送給玩家,直到
  6. 使用者將 quit 傳送給伺服器,或強制關閉 socket

這表示我們的伺服器程序將有兩種輸入:來自 Process Quest 應用程式的輸入和來自使用者的輸入。來自使用者的資料將透過 socket 進行,因此將在我們的 gen_server 的 handle_info/2 函式中處理。來自 Process Quest 的資料可以透過我們控制的方式傳送,因此由 handle_cast 處理的投射會很有意義。首先,我們必須啟動伺服器

-module(sockserv_serv).
-behaviour(gen_server).

-record(state, {name, % player's name
                next, % next step, used when initializing
                socket}). % the current socket

-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         code_change/3, terminate/2]).

首先是一個相當標準的 gen_server 回呼模組。這裡唯一特別的是狀態包含角色的名稱、socket 和一個名為 next 的欄位。 next 部分是一個用來儲存與伺服器狀態相關的臨時資訊的萬用欄位。這裡或許也可以使用 gen_fsm 而不會有太多問題。

對於實際的伺服器啟動

-define(TIME, 800).
-define(EXP, 50).

start_link(Socket) ->
    gen_server:start_link(?MODULE, Socket, []).

init(Socket) ->
    %% properly seeding the process
    <<A:32, B:32, C:32>> = crypto:rand_bytes(12),
    random:seed({A,B,C}),
    %% Because accepting a connection is a blocking function call,
    %% we can not do it in here. Forward to the server loop!
    gen_server:cast(self(), accept),
    {ok, #state{socket=Socket}}.

%% We never need you, handle_call!
handle_call(_E, _From, State) ->
    {noreply, State}.

上面定義的兩個巨集 (?TIME?EXP) 是特殊的參數,可以設定動作之間的基準延遲時間 (800 毫秒) 和達到第二級所需的經驗值 (50,在每個等級後加倍)。

您會注意到 start_link/1 函式會取得一個 socket。那是從 sockserv_sup 傳入的監聽 socket。

關於隨機種子的第一點是為了確保正確地為程序設定種子,以便稍後產生角色統計資料。否則,許多程序將使用一些預設值,而我們不希望這樣。我們在 init/1 函式而不是任何使用隨機數的函式庫中進行初始化的原因是,種子會儲存在程序層級 (該死的!可變狀態!),我們不希望在每次函式庫呼叫時都設定新的種子。

無論如何,真正重要的是我們向自己投射了一個訊息。這樣做的原因是 gen_tcp:accept/1-2 是一個封鎖式操作,再加上所有 init 函式都是同步的。如果我們等待 30 秒來接受連線,啟動該程序的監管器也將被鎖定 30 秒。因此,沒錯,我們向自己投射了一個訊息,然後將監聽 socket 加入狀態的 socket 欄位。

不要喝太多 Kool-Aid
如果您閱讀其他人的程式碼,您會經常看到人們使用 now() 的結果來呼叫 random:seed/1now() 是一個很好的函式,因為它會傳回單調時間(總是增加,永遠不會重複)。然而,它是 Erlang 中使用的隨機演算法的一個糟糕的種子值。因此,最好使用 crypto:rand_bytes(12) 來產生 12 個密碼安全的隨機位元組(如果您使用 R14B03+,請使用 crypto:strong_rand_bytes(12))。透過執行 <<A:32, B:32, C:32>>,我們將 12 個位元組轉換為 3 個要傳入的整數。

更新
從 18.0 版本開始,引入了 rand 模組,其中包含比 random 模組更好的虛擬隨機演算法,而且不需要設定種子。

我們需要接受該連線。別再閒晃了

handle_cast(accept, S = #state{socket=ListenSocket}) ->
    {ok, AcceptSocket} = gen_tcp:accept(ListenSocket),
    %% Remember that thou art dust, and to dust thou shalt return.
    %% We want to always keep a given number of children in this app.
    sockserv_sup:start_socket(), % a new acceptor is born, praise the lord
    send(AcceptSocket, "What's your character's name?", []),
    {noreply, S#state{socket=AcceptSocket, next=name}};

我們接受連線,啟動一個替代接受器(以便我們始終有大約 20 個接受器準備好處理新連線),然後將接受 socket 儲存為 ListenSocket 的替代,並記錄我們透過 socket 收到的下一個訊息是關於名稱,並帶有 'next' 欄位。

但在繼續之前,我們透過 send 函式向用戶端傳送一個問題,定義如下

send(Socket, Str, Args) ->
    ok = gen_tcp:send(Socket, io_lib:format(Str++"~n", Args)),
    ok = inet:setopts(Socket, [{active, once}]),
    ok.

詭計!因為我預期我們在收到訊息後幾乎總是需要回覆,所以我在該函式內執行了一次啟用的常式,並且也在其中加入了換行符。只是懶惰地鎖在一個函式中。

我們已經完成了步驟 1 和 2,現在我們必須等待來自 socket 的使用者輸入

handle_info({tcp, _Socket, Str}, S = #state{next=name}) ->
    Name = line(Str),
    gen_server:cast(self(), roll_stats),
    {noreply, S#state{name=Name, next=stats}};

我們不知道 Str 字串中會有什麼,但沒關係,因為狀態的 next 欄位可以讓我們知道我們收到的任何內容都是名稱。因為我預期使用者會使用 telnet 進行演示應用程式,所以我們將接收到的所有文字位元都將包含行尾。 line/1 函式(定義如下)會將其移除

%% Let's get rid of the white space and ignore whatever's after.
%% makes it simpler to deal with telnet.
line(Str) ->
    hd(string:tokens(Str, "\r\n ")).

一旦我們收到該名稱,我們會將其儲存,然後向自己投射一個訊息 (roll_stats) 以產生玩家的統計資料,這是下一步。

注意: 如果您查看檔案,您會發現我並不是比對整個訊息,而是使用較短的 ?SOCK(Var) 巨集。該巨集定義為 -define(SOCK(Msg), {tcp, _Port, Msg}).,這只是像我一樣懶惰的人用稍微少一點的打字來比對字串的快速方法。

統計數據滾動結果會回到 handle_cast 子句中

handle_cast(roll_stats, S = #state{socket=Socket}) ->
    Roll = pq_stats:initial_roll(),
    send(Socket,
         "Stats for your character:~n"
         "  Charisma: ~B~n"
         "  Constitution: ~B~n"
         "  Dexterity: ~B~n"
         "  Intelligence: ~B~n"
         "  Strength: ~B~n"
         "  Wisdom: ~B~n~n"
         "Do you agree to these? y/n~n",
         [Points || {_Name, Points} <- lists:sort(Roll)]),
    {noreply, S#state{next={stats, Roll}}};
two dice, with a 5 rolled on each

pq_stats 模組包含滾動統計數據的函式,而整個子句僅用於在那裡輸出統計數據。 ~B 格式參數表示我們希望印出一個整數。狀態的 next 部分在這裡有點過度使用。因為我們會詢問使用者是否同意,所以我們必須等待他們告訴我們,然後要麼放棄統計數據並產生新的統計數據,要麼將它們傳遞給我們很快就會開始的 Process Quest 角色。

讓我們這次在 handle_info 函式中監聽使用者輸入

handle_info({tcp, Socket, Str}, S = #state{socket=Socket, next={stats, _}}) ->
    case line(Str) of
        "y" ->
            gen_server:cast(self(), stats_accepted);
        "n" ->
            gen_server:cast(self(), roll_stats);
        _ -> % ask again because we didn't get what we wanted
            send(Socket, "Answer with y (yes) or n (no)", [])
    end,
    {noreply, S};

在這個直接的函式子句中啟動角色可能會很吸引人,但我決定不這麼做:handle_info 是處理使用者輸入,handle_cast 則處理 Process Quest 的事情。職責分離!如果使用者拒絕統計數據,我們就再次呼叫 roll_stats。沒有什麼新鮮事。當使用者接受時,我們就可以啟動 Process Quest 角色,並開始等待來自那裡的事件

%% The player has accepted the stats! Start the game!
handle_cast(stats_accepted, S = #state{name=Name, next={stats, Stats}}) ->
    processquest:start_player(Name, [{stats,Stats},{time,?TIME},
                                     {lvlexp, ?EXP}]),
    processquest:subscribe(Name, sockserv_pq_events, self()),
    {noreply, S#state{next=playing}};

這些是我為遊戲定義的常規呼叫。您啟動一個玩家,並使用 sockserv_pq_events 事件處理器訂閱事件。下一個狀態是 playing,這表示接收到的所有訊息很可能來自遊戲

%% Events coming in from process quest
%% We know this because all these events' tuples start with the
%% name of the player as part of the internal protocol defined for us
handle_cast(Event, S = #state{name=N, socket=Sock}) when element(1, Event) =:= N ->
    [case E of
       {wait, Time} -> timer:sleep(Time);
       IoList -> send(Sock, IoList, [])
     end || E <- sockserv_trans:to_str(Event)], % translate to a string
    {noreply, S}.

我不會太深入探討這是如何運作的細節。只需知道 sockserv_trans:to_str(Event) 會將一些遊戲事件轉換為 IO 列表或 {wait, Time} 元組,這些元組表示事件各部分之間要等待的延遲時間(我們會在顯示敵人掉落的物品之前先印出 執行 a ... 訊息)。

如果您回想一下要遵循的步驟列表,我們已經涵蓋了除了一個之外的所有步驟。當使用者告訴我們他們想要退出時就退出。將以下子句作為 handle_info 中的頂部子句

handle_info({tcp, _Socket, "quit"++_}, S) ->
    processquest:stop_player(S#state.name),
    gen_tcp:close(S#state.socket),
    {stop, normal, S};

停止角色、關閉 socket、終止進程。萬歲。其他退出的原因包括 TCP socket 被客戶端關閉

handle_info({tcp_closed, _Socket}, S) ->
    {stop, normal, S};
handle_info({tcp_error, _Socket, _}, S) ->
    {stop, normal, S};
handle_info(E, S) ->
    io:format("unexpected: ~p~n", [E]),
    {noreply, S}.

我也新增了一個額外的子句來處理未知的訊息。如果使用者輸入了我們預期之外的內容,我們不希望崩潰。只剩下 terminate/2code_change/3 函式需要完成

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

terminate(normal, _State) ->
    ok;
terminate(_Reason, _State) ->
    io:format("terminate reason: ~p~n", [_Reason]).

如果您完成了整個過程,您可以嘗試編譯此檔案,並將其替換為我們版本中對應的 beam 檔案,看看它是否運作良好。如果您複製的內容正確(我也複製的正確),它應該可以運作。

接下來該怎麼做?

如果您接受的話,您的下一個任務是向客戶端新增一些您選擇的命令:為什麼不新增像 '暫停' 這樣的命令,它會將動作排隊一段時間,然後在您恢復伺服器時全部輸出?或者,如果您夠厲害,在 sockserv_serv 模組中記下您目前為止的等級和統計數據,並新增從客戶端提取它們的命令。我一直很討厭留給讀者的練習,但有時候就是太想在這裡隨便丟一個,所以請享用!

否則,閱讀現有伺服器實作的原始碼、自己編寫一些程式等等都是很好的練習。很少有語言可以像寫一個 Web 伺服器一樣作為業餘愛好者的練習,但 Erlang 就是其中之一。練習一下,它就會變成像第二本能一樣。Erlang 與外界溝通只是我們朝向編寫有用的軟體邁出的許多步驟之一。