Commit cf2aff6a authored by 734642908@qq.com's avatar 734642908@qq.com

使用redis存储数据,增加集群功能

parent b9453b60
.idea/
.settings/
.vscode/
.project
Application/Runtime
%SystemDrive%/
dump.rdb
uploadfile
\ No newline at end of file
...@@ -11,12 +11,18 @@ swoole框架websocket ...@@ -11,12 +11,18 @@ swoole框架websocket
* │ ├─common.php 应用公共(函数)文件 * │ ├─common.php 应用公共(函数)文件
* ├─extend 扩展类库目录(可定义) * ├─extend 扩展类库目录(可定义)
* ├─swoole swoole框架目录(可更改) * ├─swoole swoole框架目录(可更改)
* ├─server.php 服务启动文件 * ├─temp 运行目录
* ├─swoole.log 运行log文件 * ├─start.php 服务启动文件
* ├─stop.php 服务停止文件
* ├─README.md README 文件 * ├─README.md README 文件
#服务操作:
php start.php
php stop.php
#请求协议: #请求协议:
{"a":"login","m":"user","d":{"uid":"936258"}} {"a":"set_info","m":"user","d":{"token":"xxxxx"}}
a:操作方法 m:模块名称 d:请求数据 a:操作方法 m:模块名称 d:请求数据
...@@ -25,4 +31,5 @@ a:操作方法 m:模块名称 d:请求数据 ...@@ -25,4 +31,5 @@ a:操作方法 m:模块名称 d:请求数据
c:状态码 msg:状态描述 d:数据内容 c:状态码 msg:状态描述 d:数据内容
websocket测试地址:http://193.112.57.224:9501/ http测试地址:http://193.112.57.224:9501/
\ No newline at end of file websocket测试地址:ws://193.112.57.224:9501/
\ No newline at end of file
...@@ -12,14 +12,14 @@ function __autoload($class){ ...@@ -12,14 +12,14 @@ function __autoload($class){
function echo_log($data,$data2=''){ function echo_log($data,$data2=''){
//如果打开了调试 //如果打开了调试
if(is_array($data) || is_object($data)) { if(is_array($data) || is_object($data)) {
var_dump(date("Y-m-d H:i ") . $data); var_dump(date("Y-m-d H:i:s") . $data);
} else { } else {
echo(date("Y-m-d H:i ") . $data); echo(date("Y-m-d H:i:s") . $data);
} }
echo("\r\n"); echo("\r\n");
if (!empty($data2)) { if (!empty($data2)) {
if(is_array($data2) || is_object($data2)) { if(is_array($data2) || is_object($data2)) {
var_dump(date("Y-m-d H:i ") . $data2); var_dump(date("Y-m-d H:i:s") . $data2);
} else { } else {
echo($data2); echo($data2);
} }
...@@ -41,8 +41,6 @@ function post_curl($url,$data,$header = array(), $is_ssl = false) { ...@@ -41,8 +41,6 @@ function post_curl($url,$data,$header = array(), $is_ssl = false) {
curl_setopt($ch, CURLOPT_POST, 1); // 发送一个常规的Post请求 curl_setopt($ch, CURLOPT_POST, 1); // 发送一个常规的Post请求
curl_setopt($ch, CURLOPT_POSTFIELDS, $data); // Post提交的数据包 curl_setopt($ch, CURLOPT_POSTFIELDS, $data); // Post提交的数据包
} }
$header[]= "Sercet: edf_a3d1bf7760f3485aa2f7bd4a3124db71";
$header[]= "Time: ".date('YmdHis');
if ($header) { if ($header) {
curl_setopt($ch,CURLOPT_HTTPHEADER,$header); curl_setopt($ch,CURLOPT_HTTPHEADER,$header);
} }
...@@ -99,22 +97,88 @@ function decryptToken($token) ...@@ -99,22 +97,88 @@ function decryptToken($token)
} }
} }
/**
*功能:加密解密函数
*@param string $string 要加密字符串
*@param string $operation 动作 DECODE 解密, ENCODE 加密
*@param string $key 加密key
*@param int expiry
*@return string 加密或解密的字符串
*/
function uc_authcode($string, $operation = 'DECODE', $key = '', $expiry = 0) {
$ckey_length = 4;
$key = md5($key ? $key : '12R,sf2$1U5)kk#UI9.wiu');
$keya = md5(substr($key, 0, 16));
$keyb = md5(substr($key, 16, 16));
$keyc = $ckey_length ? ($operation == 'DECODE' ? substr($string, 0, $ckey_length): substr(md5(microtime()), -$ckey_length)) : '';
$cryptkey = $keya.md5($keya.$keyc);
$key_length = strlen($cryptkey);
$string = $operation == 'DECODE' ? base64_decode(substr($string, $ckey_length)) : sprintf('%010d', $expiry ? $expiry + time() : 0).substr(md5($string.$keyb), 0, 16).$string;
$string_length = strlen($string);
$result = '';
$box = range(0, 255);
$rndkey = array();
for($i = 0; $i <= 255; $i++) {
$rndkey[$i] = ord($cryptkey[$i % $key_length]);
}
for($j = $i = 0; $i < 256; $i++) {
$j = ($j + $box[$i] + $rndkey[$i]) % 256;
$tmp = $box[$i];
$box[$i] = $box[$j];
$box[$j] = $tmp;
}
for($a = $j = $i = 0; $i < $string_length; $i++) {
$a = ($a + 1) % 256;
$j = ($j + $box[$a]) % 256;
$tmp = $box[$a];
$box[$a] = $box[$j];
$box[$j] = $tmp;
$result .= chr(ord($string[$i]) ^ ($box[($box[$a] + $box[$j]) % 256]));
}
if($operation == 'DECODE') {
if((substr($result, 0, 10) == 0 || substr($result, 0, 10) - time() > 0) && substr($result, 10, 16) == substr(md5(substr($result, 26).$keyb), 0, 16)) {
return substr($result, 26);
} else {
return '';
}
} else {
return $keyc.str_replace('=', '', base64_encode($result));
}
}
/***********************************socket公用方法start********************************************************/ /***********************************socket公用方法start********************************************************/
//根据用户ID查用户的连接记录 //type为true时server地址+端口号 OR server地址
function host_prefix($type = true) {
$re = $GLOBALS['config']['MAIN_SERVER']['IP'].':'.$GLOBALS['config']['MAIN_SERVER']['PORT'];
if($type) {
$re .= ':';
}
return $re;
}
//根据用户UID查用户的连接记录
function get_user_fd($uid){ function get_user_fd($uid){
if($GLOBALS['USER_LIST']->exist($uid)){ $uidJson = $GLOBALS['redisDB']->hget($GLOBALS['config']['HASH_TABLE']['UID'],$uid);
$tmp_val = $GLOBALS['USER_LIST']->get($uid); if($uidJson) {
return $tmp_val['fd']; $uidData = json_decode($uidJson,true);
}else{ return $uidData['fd'];
} else {
return false; return false;
} }
} }
//根据用户的fd查用户的uid //根据用户的fd查用户的uid
function get_user_uid($fd){ function get_user_uid($fd){
if($GLOBALS['USER_LIST_FD']->exist($fd)){ $fdJson = $GLOBALS['redisDB']->hget($GLOBALS['config']['HASH_TABLE']['FD'],host_prefix().$fd);
$tmp_val = $GLOBALS['USER_LIST_FD']->get($fd); if($fdJson) {
return $tmp_val['uid']; $fdData = json_decode($fdJson,true);
}else{ return $fdData['uid'];
} else {
return false; return false;
} }
} }
...@@ -128,20 +192,28 @@ $data = array( ...@@ -128,20 +192,28 @@ $data = array(
*/ */
function set_user_list($data){ function set_user_list($data){
//删掉这个uid之前绑定的连接 //删掉这个uid之前绑定的连接
$tmp_uid = $GLOBALS['USER_LIST']->get($data['uid']); $uidJson = $GLOBALS['redisDB']->hget($GLOBALS['config']['HASH_TABLE']['UID'],$data['uid']);
if($tmp_uid && $tmp_uid['fd']>0){ if($uidJson){
$GLOBALS['USER_LIST_FD']->del($tmp_uid['fd']); $uidData = json_decode($uidJson,true);
$GLOBALS['USER_LIST']->del($data['uid']); $GLOBALS['redisDB']->hdel($GLOBALS['config']['HASH_TABLE']['UID'],$data['uid']);
$GLOBALS['redisDB']->hdel($GLOBALS['config']['HASH_TABLE']['FD'],host_prefix().$uidData['fd']);
} }
$GLOBALS['USER_LIST']->set($data['uid'],array('fd'=>$data['fd']));
$GLOBALS['USER_LIST_FD']->set($data['fd'],array('uid'=>$data['uid'])); $setData = array(
'fd'=>$data['fd']
,'uid'=>$data['uid']
,'socket'=>host_prefix(false)
);
$setData = json_encode($setData);
$GLOBALS['redisDB']->hset($GLOBALS['config']['HASH_TABLE']['UID'],$data['uid'],$setData);
$GLOBALS['redisDB']->hset($GLOBALS['config']['HASH_TABLE']['FD'],host_prefix().$data['fd'],$setData);
return true; return true;
} }
//删除用户在线数据 //删除用户在线数据
function del_user_list($fd){ function del_user_list($fd) {
$GLOBALS['USER_LIST']->del(get_user_uid($fd)); $re = $GLOBALS['redisDB']->hdel($GLOBALS['config']['HASH_TABLE']['UID'],get_user_uid($fd));
$GLOBALS['USER_LIST_FD']->del($fd); $re1 = $GLOBALS['redisDB']->hdel($GLOBALS['config']['HASH_TABLE']['FD'],host_prefix().$fd);
return true; return true;
} }
...@@ -181,13 +253,29 @@ function send_group($server,$arr,$data,$opcode=1){ ...@@ -181,13 +253,29 @@ function send_group($server,$arr,$data,$opcode=1){
//给所有用户发信息 //给所有用户发信息
function send_all($server,$data,$opcode=1,$finish=true){ function send_all($server,$data,$opcode=1,$finish=true){
if(!empty($GLOBALS['USER_LIST'])){ $userList = $GLOBALS['redisDB']->HGETALL($GLOBALS['config']['HASH_TABLE']['UID']);
foreach($GLOBALS['USER_LIST'] as $fd){ if(!empty($userList)){
$server->push($fd['fd'],$data,$opcode,$finish); foreach($userList as $key=>$val){
$arr = json_decode($val,true);
if(host_prefix(false) == $arr['socket']) {
//发送消息给接收者
$server->push($arr['fd'],json_encode($data),$opcode,$finish);
} else {
$postData = array(
'to_uid'=>$key
,'from_uid'=>'系统广播'
,'content'=>$data['d']['content']
);
$postUrl = $arr['socket'].'?event=http_send_msg';
$finish = post_curl($postUrl,$postData);
}
if($finish){ if($finish){
echo_log('用户['. $fd['fd'] .']发送消息成功!\r\n'); echo_log('用户['. $key .']发送消息成功!\r\n');
}else{ }else{
echo_log('用户['. $fd['fd'] .']的客户端已经断开,无法发送消息!\r\n'); echo_log('用户['. $key .']的客户端已经断开,无法发送消息!\r\n');
} }
} }
}else{ }else{
......
<?php <?php
$GLOBALS['config'] = array( $GLOBALS['config'] = array(
'SERVER_NAME'=>"test", 'SERVER_NAME'=>"test",
'HASH_TABLE'=>[
'UID'=>'test_uid_list'
,'FD'=>'test_fd_list'
],
'MAIN_SERVER'=>[ 'MAIN_SERVER'=>[
'HOST'=>'0.0.0.0', 'HOST'=>'0.0.0.0',
'PORT'=>9502, 'IP'=>'193.112.57.224',
'PORT'=>9501,
'SETTING'=>[ 'SETTING'=>[
'open_tcp_nodelay' => 1, 'open_tcp_nodelay' => 1,
'open_cpu_affinity' => 1, 'open_cpu_affinity' => 1,
'task_worker_num' =>8, 'task_worker_num' =>8,
'dispatch_mode'=>2, 'dispatch_mode'=>2,
'daemonize' => 0,//守护进程化。设置daemonize => 1时,程序将转入后台作为守护进程运行。长时间运行的服务器端程序必须启用此项。 'daemonize' => 1,//守护进程化。设置daemonize => 1时,程序将转入后台作为守护进程运行。长时间运行的服务器端程序必须启用此项。
'reactor_num' => 2,//2个处理线程的数量 'reactor_num' => 2,//2个处理线程的数量
'worker_num' => 4,//4个线程 'worker_num' => 4,//4个线程
'max_request' => 0,//最大10万个连接请求 'max_request' => 0,//最大10万个连接请求
// 'heartbeat_idle_time' => 60,//如果客户端超过600秒未向服务器发消息 // 'heartbeat_idle_time' => 60,//如果客户端超过600秒未向服务器发消息
// 'heartbeat_check_interval' =>61,//每60秒检查一次所有的连接 // 'heartbeat_check_interval' =>61,//每60秒检查一次所有的连接
'pid_file' => ROOT_PATH.'temp/server.pid',
'log_file' => ROOT_PATH.'swoole.log' 'log_file' => ROOT_PATH.'temp/swoole.log'
] ]
], ],
'REDIS'=>array( 'REDIS'=>array(
......
...@@ -18,9 +18,26 @@ class user extends base { ...@@ -18,9 +18,26 @@ class user extends base {
} }
public function show_data()
{
$uidList = $GLOBALS['redisDB']->HGETALL($GLOBALS['config']['HASH_TABLE']['UID']);
$fdList = $GLOBALS['redisDB']->HGETALL($GLOBALS['config']['HASH_TABLE']['FD']);
$reData = json_encode(array('c'=>1,'msg'=>'success','d'=>array('host'=>host_prefix(false).'99999','uidList'=>$uidList,'fdList'=>$fdList)));
$this->server->push($this->fd, $reData);
}
public function del_uid()
{
$re = $GLOBALS['redisDB']->del($GLOBALS['config']['HASH_TABLE']['UID']);
$re1 = $GLOBALS['redisDB']->del($GLOBALS['config']['HASH_TABLE']['FD']);
$reData = json_encode(array('c'=>1,'msg'=>'success','d'=>array('re'=>$re,'re1'=>$re1)));
$this->server->push($this->fd, $reData);
}
public function online_num() public function online_num()
{ {
$reData = json_encode(array('c'=>1,'msg'=>'success','d'=>array('num'=>count($GLOBALS['USER_LIST'])))); $num = $GLOBALS['redisDB']->HLEN($GLOBALS['config']['HASH_TABLE']['UID']);
$reData = json_encode(array('c'=>1,'msg'=>'success','d'=>array('num'=>$num)));
$this->server->push($this->fd, $reData); $this->server->push($this->fd, $reData);
} }
...@@ -45,7 +62,7 @@ class user extends base { ...@@ -45,7 +62,7 @@ class user extends base {
,'msg'=>'token不正确' ,'msg'=>'token不正确'
); );
} else { } else {
set_user_list(array('fd'=>$frame->fd,'uid'=>$token['uid'])); set_user_list(array('fd'=>$this->fd,'uid'=>$token['uid']));
$result = array( $result = array(
'c'=>1 'c'=>1
,'msg'=>'保存成功' ,'msg'=>'保存成功'
...@@ -67,20 +84,25 @@ class user extends base { ...@@ -67,20 +84,25 @@ class user extends base {
$consignee = array( $consignee = array(
'c'=>1 'c'=>1
,'msg'=>'接收到广播信息' ,'msg'=>'接收到广播信息'
,'d'=>array('sender'=>get_user_uid($this->fd),'content'=>$this->reqData['content']) ,'d'=>array('sender'=>'系统广播','content'=>$this->reqData['content'])
); );
send_all($this->server,json_encode($consignee)); send_all($this->server,$consignee);
//发送成功,通知发送者 //发送成功,通知发送者
$sender = array( $result = array(
'c'=>1 'c'=>1
,'msg'=>'信息发送成功' ,'msg'=>'信息发送成功'
); );
$this->server->push($this->fd, json_encode($sender)); if(!empty($this->fd)) {
return 'success'; $this->server->push($this->fd, json_encode($result));
}
return json_encode($result);
} }
if(!empty($this->fd)) {
$this->server->push($this->fd, json_encode($result)); $this->server->push($this->fd, json_encode($result));
return 'fail'; }
return json_encode($result);
} }
//给指定用户发送消息 //给指定用户发送消息
...@@ -90,6 +112,11 @@ class user extends base { ...@@ -90,6 +112,11 @@ class user extends base {
'c'=>2 'c'=>2
,'msg'=>'to_uid参数不存在' ,'msg'=>'to_uid参数不存在'
); );
} else if(empty($this->reqData['from_uid'])) {
$result = array(
'c'=>2
,'msg'=>'from_uid参数不存在'
);
} else if(empty($this->reqData['content'])) { } else if(empty($this->reqData['content'])) {
$result = array( $result = array(
'c'=>2 'c'=>2
...@@ -97,18 +124,28 @@ class user extends base { ...@@ -97,18 +124,28 @@ class user extends base {
); );
} else { } else {
$uidJson = $GLOBALS['redisDB']->hget($GLOBALS['config']['HASH_TABLE']['UID'],$this->reqData['to_uid']);
if($uidJson) {
$uidData = json_decode($uidJson,true);
if(host_prefix(false) == $uidData['socket']) {
//发送消息给接收者 //发送消息给接收者
$consignee = array( $consignee = array(
'c'=>1 'c'=>1
,'msg'=>'接收到新信息' ,'msg'=>'接收到新信息'
,'d'=>array('sender'=>$this->reqData['to_uid'],'content'=>$this->reqData['content']) ,'d'=>array('sender'=>$this->reqData['from_uid'],'content'=>$this->reqData['content'])
); );
if(is_array($this->reqData['to_uid'])) {
$re1 = send_group($this->server,$this->reqData['to_uid'],json_encode($consignee));
} else {
$re1 = send_user($this->server,$this->reqData['to_uid'],json_encode($consignee)); $re1 = send_user($this->server,$this->reqData['to_uid'],json_encode($consignee));
} else {
$postData = array(
'to_uid'=>$this->reqData['to_uid']
,'from_uid'=>$this->reqData['from_uid']
,'content'=>$this->reqData['content']
);
$postUrl = $uidData['socket'].'?event=http_send_msg';
$re1 = post_curl($postUrl,$postData);
} }
if($re1) { if($re1) {
//发送成功,通知发送者 //发送成功,通知发送者
$sender = array( $sender = array(
...@@ -125,9 +162,63 @@ class user extends base { ...@@ -125,9 +162,63 @@ class user extends base {
,'msg'=>'信息发送失败' ,'msg'=>'信息发送失败'
); );
} }
} else {
$result = array(
'c'=>2
,'msg'=>'to_uid参数错误'
);
}
} }
if(!empty($this->fd)) {
$this->server->push($this->fd, json_encode($result)); $this->server->push($this->fd, json_encode($result));
return json_encode($sender);
} }
return json_encode($result);
}
//通过http推送消息给用户
public function http_send_msg() {
if(empty($this->reqData['to_uid'])) {
$result = array(
'c'=>2
,'msg'=>'to_uid参数不存在'
);
} else if(empty($this->reqData['from_uid'])) {
$result = array(
'c'=>2
,'msg'=>'from_uid参数不存在'
);
} else if(empty($this->reqData['content'])) {
$result = array(
'c'=>2
,'msg'=>'content参数不存在'
);
} else {
//发送消息给接收者
$consignee = array(
'c'=>1
,'msg'=>'接收到新信息'
,'d'=>array('sender'=>$this->reqData['from_uid'],'content'=>$this->reqData['content'])
);
$re1 = send_user($this->server,$this->reqData['to_uid'],json_encode($consignee));
if($re1) {
//发送成功,通知发送者
$sender = array(
'c'=>1
,'msg'=>'信息发送成功'
);
return true;
} else {
$result = array(
'c'=>2
,'msg'=>'信息发送失败'
);
}
}
return false;
}
} }
\ No newline at end of file
<?php
//在线用户列表
$GLOBALS['USER_LIST'] = new swoole_table(20480);
$GLOBALS['USER_LIST']->column('fd', swoole_table::TYPE_INT, 4);
$GLOBALS['USER_LIST']->create();
//在线用户连接列表
$GLOBALS['USER_LIST_FD'] = new swoole_table(20480);
$GLOBALS['USER_LIST_FD']->column('uid', swoole_table::TYPE_STRING, 15);
$GLOBALS['USER_LIST_FD']->create();
//任务数
$GLOBALS["TASK_LIST"] = new swoole_table(20480);
$GLOBALS['TASK_LIST']->column('task',swoole_table::TYPE_INT, 8);
$GLOBALS['TASK_LIST']->create();
// 定义根目录
define('ROOT_PATH',dirname(__FILE__).'/');
// 引入socketServer文件
require './swoole/socketServer.php';
$server = new socketServer($GLOBALS['config']['MAIN_SERVER']['HOST'],$GLOBALS['config']['MAIN_SERVER']['PORT']);
\ No newline at end of file
<?php
//开启服务
// 定义根目录
define('ROOT_PATH',dirname(__FILE__).'/');
// 引入socketServer文件
require './swoole/socketServer.php';
$server = new socketServer($GLOBALS['config']['MAIN_SERVER']['HOST'],$GLOBALS['config']['MAIN_SERVER']['PORT']);
\ No newline at end of file
<?php
//停止服务
$pid = file_get_contents('./temp/server.pid');
shell_exec("kill -9 $pid");
\ No newline at end of file
...@@ -14,6 +14,8 @@ class socketWork { ...@@ -14,6 +14,8 @@ class socketWork {
} }
public function onWorkerStop( $server , $worker_id) { public function onWorkerStop( $server , $worker_id) {
// $GLOBALS['redisDB']->del($GLOBALS['config']['HASH_TABLE']['UID']);
// $GLOBALS['redisDB']->del($GLOBALS['config']['HASH_TABLE']['FD']);
if ($GLOBALS['redisDB']) { if ($GLOBALS['redisDB']) {
$GLOBALS['redisDB']->close(); $GLOBALS['redisDB']->close();
} }
...@@ -23,14 +25,16 @@ class socketWork { ...@@ -23,14 +25,16 @@ class socketWork {
} }
public function onWorkerError( $server , $worker_id) { public function onWorkerError( $server , $worker_id) {
// $GLOBALS['redisDB']->del($GLOBALS['config']['HASH_TABLE']['UID']);
// $GLOBALS['redisDB']->del($GLOBALS['config']['HASH_TABLE']['FD']);
echo _log('work进程报错'); echo _log('work进程报错');
} }
public function onOpen($server, $frame) { public function onOpen($server, $frame) {
$uid = 'fd'.$frame->fd; // $uid = 'fd'.$frame->fd;
set_user_list(array('fd'=>$frame->fd,'uid'=>$uid)); // set_user_list(array('fd'=>$frame->fd,'uid'=>$uid));
$reData = json_encode(array('c'=>1,'msg'=>'websocket连接成功','d'=>array('uid'=>$uid))); $reData = json_encode(array('c'=>1,'msg'=>'websocket连接成功'));
$server->push($frame->fd, $reData); $server->push($frame->fd, $reData);
} }
...@@ -66,6 +70,28 @@ class socketWork { ...@@ -66,6 +70,28 @@ class socketWork {
$act->setData($data); $act->setData($data);
$result = $act->send_msg(); $result = $act->send_msg();
$response->write($result);
} else if($event == 'broadcast') {
//发送广播
$data = array(
'a'=>'broadcast'
,'m'=>'user'
,'d'=>$request->post
);
$tmp_task_id = $this->server->task($data);
echo_log("进行task任务:".$tmp_task_id);
$response->write(json_encode(array('c'=>1,'msg'=>'广播任务投递成功')));
} else if($event == 'http_send_msg') {
//内部转发消息使用,不对外开放(集群部署使用)
$act = new user();
$data = array(
'server'=>$server
,'d'=>$request->post
);
$act->setData($data);
$result = $act->http_send_msg();
$response->write($result); $response->write($result);
} else { } else {
//默认返回 //默认返回
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment