如何創建長時間運行的作業
如果 Web 應用程序中的一個特性需要超過 1 秒或 2 秒才能完成,那么應該怎么辦?需要某種離線處理解決方案。學習幾種對 PHP 應用程序中長時間運行的作業進行離線服務的方法。
大型的連鎖店有一個大問題。每天,在每家商店會發生數千次交易。公司執行官希望對這些數據進行挖掘。哪些產品賣得好?哪些不好?有機產品在哪里賣得好?冰淇淋的銷售情況怎么樣?
為了捕捉這些數據,組織必須將所有事務性數據裝載進一個數據模型,以便更適合生成公司所需的報告類型。但是,這很花費時間,而且隨著連鎖規模的增長,處理一天的數據可能要花費一天以上的時間。因此,這是個大問題。
現在,您的 Web 應用程序可能不需要處理這么多數據,但是任何站點的處理時間都有可能超過客戶愿意等待的時間。一般來說,客戶愿意等待的時間是 200 毫秒,如果超過這個時間,客戶就會覺得過程 “緩慢”。這個數字基于桌面應用程序,而 Web 使我們更有耐心了。但無論如何,不應該讓客戶等待的時間超過幾秒。所以,要采用一些策略來處理 PHP 中的批處理作業。
分散的方式與 cron
在 UNIX® 機器上,執行批處理的核心程序是 cron 守護進程。這個守護進程讀取一個配置文件,這個文件會告訴它要運行哪些命令行以及運行的頻率。然后,這個守護進程就按照配置執行它們。在遇到錯誤時,它甚至能夠向指定的電子郵件地址發送錯誤輸出,從而幫助對問題進行調試。
我知道一些工程師強烈主張使用線程技術。“線程!線程才是進行后臺處理的真正方法。cron 守護進程太過時了。”
我不這么認為。
這兩種方法我都用過,我認為 cron 具備 “Keep It Simple, Stupid(KISS,簡單就是美)” 原則的優點。它使后臺處理保持簡單。不需要編寫一直運行的多線程的作業處理應用程序(因此不會有內存泄漏),而是由 cron 啟動一個簡單的批處理腳本。這個腳本判斷是否有作業要處理,執行作業,然后退出。不需要擔心內存泄漏。也不需要擔心線程停止或陷入無限循環。
那么,cron 是如何工作的?這依賴于您所處的系統環境。我只討論老式簡單的 cron 的 UNIX 命令行版本,您可以向系統管理員咨詢如何在自己的 Web 應用程序中實現它。
下面是一個簡單的 cron 配置,它在每天晚上 11 點運行一個 PHP 腳本:
0 23 * * * jack /usr/bin/php /users/home/jack/myscript.php
前 5 個字段定義應該啟動腳本的時間。然后是應該用來運行這個腳本的用戶名。其余的命令是要執行的命令行。時間字段分別是分、小時、月中的日、月和周中的日。下面是幾個示例。
命令:
15 * * * * jack /usr/bin/php /users/home/jack/myscript.php
在每個小時的第 15 分鐘運行腳本。
命令:
15,45 * * * * jack /usr/bin/php /users/home/jack/myscript.php
在每個小時的第 15 和第 45 分鐘運行腳本。
命令:
*/1 3-23 * * * jack /usr/bin/php /users/home/jack/myscript.php
在早上 3 點到晚上 11 點之間的每分鐘運行腳本。
命令
30 23 * * 6 jack /usr/bin/php /users/home/jack/myscript.php
在每星期六的晚上 11:30 運行腳本(星期六由 6 指定)。
可以看到,組合的數量是無限的。可以根據需要控制運行腳本的時間。還可以指定多個要運行的腳本,這樣的話,一些腳本可以每分鐘都運行,而其他腳本(比如備份腳本)可以每天只運行一次。
為了指定將報告的錯誤發送到哪個電子郵件地址,可以使用 MAILTO 指令,如下所示:
MAILTO=jherr@pobox.com
注意:對于 Microsoft® Windows® 用戶,有一個等效的 Scheduled Tasks 系統可以用來定期啟動命令行進程(比如 PHP 腳本)。
批處理體系結構的基礎知識
批處理是相當簡單的。在大多數情況下,采用兩個工作流之一。第一個工作流用于進行報告;腳本每天運行一次,它生成報告并將報告發送給一組用戶。第二個工作流是在響應某種請求時創建的批作業。例如,我登錄進 Web 應用程序中,并要求它向系統中注冊的所有用戶發送一個消息,將一個新的特性告訴他們。這個操作必須進行批處理,因為系統中有 10,000 個用戶。PHP 要花費一段時間才能完成這樣的任務,所以它必須由瀏覽器之外的一個作業來執行。
在第二個工作流中,Web 應用程序只需將信息放在某個位置,讓批處理應用程序共享它。這些信息指定作業的性質(例如,“Send this e-mail to all the people on the system”。)批處理程序運行這個作業,然后刪除作業。另一種方法是,處理程序將作業標為已完成。無論用哪種方法,作業都應該識別為已完成,這樣就不會再次運行它。
本文的其余部分演示在 Web 應用程序前端和批處理后端之間共享數據的各種方法。
郵件隊列
第一種方法是使用專用的郵件隊列系統。在這種模型中,數據庫中的一個表包含應該發送給各個用戶的電子郵件消息。Web 界面使用mailouts 類將電子郵件添加到隊列中。電子郵件處理程序使用 mailouts 類檢索未處理的電子郵件,然后再次使用它從隊列中刪除未處理的電子郵件。
這個模型首先需要 MySQL 模式。
清單 1. mailout.sql
DROP TABLE IF EXISTS mailouts; CREATE TABLE mailouts ( id MEDIUMINT NOT NULL AUTO_INCREMENT, from_address TEXT NOT NULL, to_address TEXT NOT NULL, subject TEXT NOT NULL, content TEXT NOT NULL, PRIMARY KEY ( id ) );
這個模式非常簡單。每行中有一個 from 和一個 to 地址,以及電子郵件的主題和內容。
對數據庫中的 mailouts 表進行處理的是 PHP mailouts 類。
清單 2. mailouts.php
getMessage()); } return $db; } public static function delete( $id ) { $db = Mailouts::get_db(); $sth = $db->prepare( 'DELETE FROM mailouts WHERE id=?' ); $db->execute( $sth, $id ); return true; } public static function add( $from, $to, $subject, $content ) { $db = Mailouts::get_db(); $sth = $db->prepare( 'INSERT INTO mailouts VALUES (null,?,?,?,?)' ); $db->execute( $sth, array( $from, $to, $subject, $content ) ); return true; } public static function get_all() { $db = Mailouts::get_db(); $res = $db->query( "SELECT * FROM mailouts" ); $rows = array(); while( $res->fetchInto( $row ) ) { $rows []= $row; } return $rows; } } ?>
這個腳本包含 Pear::DB 數據庫訪問類。然后定義 mailouts 類,其中包含三個主要的靜態函數:add、delete 和 get_all。add() 方法向隊列中添加一個電子郵件,這個方法由前端使用。get_all() 方法從表中返回所有數據。delete() 方法刪除一個電子郵件。
您可能會問,我為什么不只在腳本末尾調用 delete_all() 方法。不這么做有兩個原因:如果在發送每個消息之后刪除它,那么即使腳本在出現問題之后重新運行,消息也不可能發送兩次;在批作業的啟動和完成之間可能會添加新的消息。
下一步是編寫一個簡單的測試腳本,這個腳本將一個條目添加到隊列中。
清單 3. mailout_test_add.php
在這個示例中,我添加一個 mailout,這個消息要發送給某公司的 Molly,其中包括主題 “Test Subject” 和電子郵件主體。可以在命令行上運行這個腳本:php mailout_test_add.php。
為了發送電子郵件,需要另一個腳本,這個腳本作為作業處理程序。
清單 4. mailout_send.php
這個腳本使用 get_all() 方法檢索所有電子郵件消息,然后使用 PHP 的 mail() 方法逐一發送消息。在每次成功發送電子郵件之后,調用delete() 方法從隊列中刪除對應的記錄。
使用 cron 守護進程定期運行這個腳本。運行這個腳本的頻率取決于您的應用程序的需要。
注意:PHP Extension and Application Repository(PEAR)存儲庫包含一個出色的 郵件隊列系統 實現,可以免費下載。
更通用的方法
專門用來發送電子郵件的解決方案是很不錯,但是是否有更通用的方法?我們需要能夠發送電子郵件、生成報告或者執行其他耗費時間的處理,而不必在瀏覽器中等待處理完成。
為此,可以利用一個事實:PHP 是一種解釋型語言。可以將 PHP 代碼存儲在數據庫中的隊列中,以后再執行它。這需要兩個表,見清單 5。
清單 5. generic.sql
DROP TABLE IF EXISTS processing_items; CREATE TABLE processing_items ( id MEDIUMINT NOT NULL AUTO_INCREMENT, function TEXT NOT NULL, PRIMARY KEY ( id ) ); DROP TABLE IF EXISTS processing_args; CREATE TABLE processing_args ( id MEDIUMINT NOT NULL AUTO_INCREMENT, item_id MEDIUMINT NOT NULL, key_name TEXT NOT NULL, value TEXT NOT NULL, PRIMARY KEY ( id ) );
第一個表 processing_items 包含作業處理程序調用的函數。第二個表 processing_args 包含要發送給函數的參數,采用的形式是由鍵/值對組成的 hash 表。
與 mailouts 表一樣,這兩個表也由 PHP 類包裝,這個類稱為 ProcessingItems。
清單 6. generic.php
prepare( 'DELETE FROM processing_args WHERE item_id=?' ); $db->execute( $sth, $id ); $sth = $db->prepare( 'DELETE FROM processing_items WHERE id=?' ); $db->execute( $sth, $id ); return true; } public static function add( $function, $args ) { $db = ProcessingItems::get_db(); $sth = $db->prepare( 'INSERT INTO processing_items VALUES (null,?)' ); $db->execute( $sth, array( $function ) ); $res = $db->query( "SELECT last_insert_id()" ); $id = null; while( $res->fetchInto( $row ) ) { $id = $row[0]; } foreach( $args as $key => $value ) { $sth = $db->prepare( 'INSERT INTO processing_args VALUES (null,?,?,?)' ); $db->execute( $sth, array( $id, $key, $value ) ); } return true; } public static function get_all() { $db = ProcessingItems::get_db(); $res = $db->query( "SELECT * FROM processing_items" ); $rows = array(); while( $res->fetchInto( $row ) ) { $item = array(); $item['id'] = $row[0]; $item['function'] = $row[1]; $item['args'] = array(); $ares = $db->query( "SELECT key_name, value FROM processing_args WHERE item_id=?", $item['id'] ); while( $ares->fetchInto( $arow ) ) $item['args'][ $arow[0] ] = $arow[1]; $rows []= $item; } return $rows; } } ?>
這個類包含三個重要的方法:add()、get_all() 和 delete()。與 mailouts 系統一樣,前端使用 add(),處理引擎使用 get_all() 和delete()。
清單 7 所示的測試腳本將一個條目添加到處理隊列中。
清單 7. generic_test_add.php
'foo' ) ); ?>
在這個示例中,添加了一個對 printvalue 函數的調用,并將 value 參數設置為 foo。我使用 PHP 命令行解釋器運行這個腳本,并將這個方法調用放進隊列中。然后使用以下處理腳本運行這個方法。
清單 8. generic_process.php
這個腳本非常簡單。它獲得 get_all() 返回的處理條目,然后使用 call_user_func_array(一個 PHP 內部函數)用給定的參數動態地調用這個方法。在這個示例中,調用本地的 printvalue 函數。
為了演示這種功能,我們看看在命令行上發生了什么:
% php generic_test_add.php % php generic_process.php Printing: foo %
輸出并不多,但是您能夠看出要點。通過這種機制,可以將任何 PHP 函數的處理推遲。
現在,如果您不喜歡將 PHP 函數名和參數放進數據庫中,那么另一種方法是在 PHP 代碼中建立數據庫中的 “處理作業類型” 名稱和實際 PHP 處理函數之間的映射。按照這種方式,如果以后決定修改 PHP 后端,那么只要 “處理作業類型” 字符串匹配,系統就仍然可以工作。
放棄數據庫
最后,我演示另一種稍有不同的解決方案,它使用一個目錄中的文件來存儲批作業,而不是使用數據庫。在這里提供這個思路并不是建議您 “采用這種方式,而不使用數據庫”,這只是一種可供選擇的方式,是否采用它由您決定。
顯然,這個解決方案中沒有模式,因為我們不使用數據庫。所以先編寫一個類,它包含與前面示例中相似的 add()、get_all() 和 delete()方法。
清單 9. batch_by_file.php
$v ) { fprintf( $fh, $k.":".$v."\n" ); } fclose( $fh ); return true; } public static function get_all() { $rows = array(); if (is_dir(BATCH_DIRECTORY)) { if ($dh = opendir(BATCH_DIRECTORY)) { while (($file = readdir($dh)) !== false) { $path = BATCH_DIRECTORY.$file; if ( is_dir( $path ) == false ) { $item = array(); $item['id'] = $path; $fh = fopen( $path, 'r' ); if ( $fh ) { $item['function'] = trim(fgets( $fh )); $item['args'] = array(); while( ( $line = fgets( $fh ) ) != null ) { $args = split( ':', trim($line) ); $item['args'][$args[0]] = $args[1]; } $rows []= $item; fclose( $fh ); } } } closedir($dh); } } return $rows; } } ?>
BatchFiles 類有三個主要方法:add()、get_all() 和 delete()。這個類不訪問數據庫,而是讀寫 batch_items 目錄中的文件。
使用以下測試代碼添加新的批處理條目。
清單 10. batch_by_file_test_add.php
'foo' ) ); ?>
有一點需要注意:除了類名(BatchFiles)之外,實際上沒有任何跡象能夠說明作業是如何存儲的。所以,以后很容易將它改為數據庫風格的存儲方式,而不需要修改接口。
最后是處理程序的代碼。
清單 11. batch_by_file_processor.php
這段代碼幾乎與數據庫版本完全相同,只是修改了文件名和類名。
結語
正如前面提到的,服務器對線程提供了許多支持,可以進行后臺批處理。在某些情況下,使用輔助線程處理小作業肯定比較容易。但是,也可以使用傳統工具(cron、MySQL、標準的面向對象的 PHP 和 Pear::DB)在 PHP 應用程序中創建批作業,這很容易實現、部署和維護。