diff --git a/push-server/README.md b/push-server/README.md index 8e5d35e..e16a97e 100644 --- a/push-server/README.md +++ b/push-server/README.md @@ -89,7 +89,7 @@ server ##HTTP API -string[]类型,表示http协议中list类型参数,如 get?uid=123&uid=456 ,表示一个uid数组 [123, 456]. get?uid=123 表示单个uid数组 [123] +string[]类型,表示http协议中list类型参数,如 get?pushId=123&pushId=456 ,表示一个pushId数组 [123, 456]. get?pushId=123 表示单个pushId数组 [123] ### /api/push 应用内透传 @@ -101,8 +101,6 @@ pushAll -> string, true表示推送全网,其它或者留空表示单个推送 pushId -> string[], 客户端生成的随机ID,单个或者数组 -uid -> string[] 通过addPushIdToUid接口绑定的uid - --- 以上参数3选一,指定推送对象 data -> string, base64编码的二进制数据 @@ -118,8 +116,6 @@ pushAll -> string, true表示推送全网,其它或者留空表示单个推送 pushId -> string[], 客户端生成的随机ID,单个或者数组 -uid -> string[], 通过addPushIdToUid接口绑定的uid - --- 以上参数3选一,指定推送对象 notification -> 通知消息内容 需要url encode @@ -146,12 +142,3 @@ badge(ios) - (apn对应的badge字段) 可选 sound(ios) - (apn对应的sound字段) 可选 payload - 发送给应用非显示用的透传信息, 需要是一个json map - - -### /api/addPushIdToUid 绑定UID和pushId - -http://yourip:11001/api/addPushIdToUid?pushId=abc&uid=123 - -pushId -> string,客户端生成的随机ID - -uid -> string,服务器需要绑定的UID diff --git a/push-server/bin/push-server b/push-server/bin/push-server index b0cab4e..a6337ee 100755 --- a/push-server/bin/push-server +++ b/push-server/bin/push-server @@ -1,5 +1,17 @@ #!/bin/bash +if [ -f "pid/pid" ] + then + kill `cat pid/pid` + echo "killing `cat pid/pid`" + while [[ ${?} == 0 ]] # Repeat until the process has terminated. + do + sleep 0.1 # Wait a bit before testing. + ps -p `cat pid/pid` >/dev/null # Check if the process has terminated. + done + rm -rf pid/pid + fi + mkdir -p log mkdir -p pid @@ -7,7 +19,7 @@ ifconfig eth0 | grep 'inet addr:' | cut -d: -f2 | awk '{ print $1}' > ip # A POSIX variable OPTIND=1 # Reset in case getopts has been used previously in the shell. - +ARGS="$@" # Initialize our own variables: COUNT=1 DEBUG=0 @@ -39,14 +51,14 @@ shift $((OPTIND-1)) echo "DEBUG=$DEBUG, COUNT='$COUNT',FOREGROUND='$FOREGROUND', Leftovers: $@" -if [ $DEBUG = 1 ]; then +if [ $DEBUG = 1 -a $FOREGROUND = 1 ]; then export DEBUG=apn,apn:socket,apn:trace,NotificationService,RestApi,TTLService,AdminCommand,ApiThreshold,ProxyServer,ApnService LOG_FILE=debug.log -elif [ $VERBOSE = 1 ]; then +elif [ $VERBOSE = 1 -a $FOREGROUND = 1 ]; then export DEBUG=SimpleRedisHashCluster,apn,Stats,socket.io*,ProxyServer,PacketService,NotificationService,RestApi,TTLService,Sentinel,AdminCommand,ApiThreshold,ApnService LOG_FILE=verbose.log else - export DEBUG= + unset DEBUG fi BASEDIR=$(dirname $0) @@ -54,23 +66,9 @@ BASEDIR=$(dirname $0) node_script="$BASEDIR/node-push-server" if [ $FOREGROUND = 1 ]; then - $node_script "$@" + $node_script $ARGS exit fi -for (( c=1; c<=COUNT; c++ )) -do - if [ -f "pid/$c.pid" ] - then - kill `cat pid/$c.pid` - echo "killing #$c" - while [[ ${?} == 0 ]] # Repeat until the process has terminated. - do - sleep 0.1 # Wait a bit before testing. - ps -p `cat pid/$c.pid` >/dev/null # Check if the process has terminated. - done - rm pid/$c.pid - fi - echo "starting instance #$c in background" - nohup $node_script "$@" -i $c >> $LOG_DIR/$c.log 2>&1 & echo $! > pid/$c.pid -done \ No newline at end of file + echo "starting push-server in background" + nohup $node_script $ARGS >> $LOG_DIR/console.log 2>&1 & echo $! > pid/pid diff --git a/push-server/config.js b/push-server/config.js index 09bdca4..0b1f175 100644 --- a/push-server/config.js +++ b/push-server/config.js @@ -6,38 +6,63 @@ config.pingInterval = 25000; config.apns = [ { production: false, + maxConnections: 100, bundleId: "com.xuduo.pushtest", cert: process.cwd() + "/cert/com.xuduo.pushtest/cert.pem", key: process.cwd() + "/cert/com.xuduo.pushtest/key.pem" }, { production: false, + maxConnections: 50, bundleId: "com.xuduo.pushtest2", cert: process.cwd() + "/cert/com.xuduo.pushtest2/cert.pem", key: process.cwd() + "/cert/com.xuduo.pushtest2/key.pem" } ]; +config.apnsSliceServers = [ + "http://localhost:11001", + "http://localhost:11002" +]; + +config.apiAuth = function (path, req, logger) { + var ip = req.headers['x-real-ip'] || req.connection.remoteAddress; + logger.info("%s caller ip %s", path, ip); + return true; +} + + config.redis = { - masters: [ + pubs: [ + [ + { + host: "127.0.0.1", + port: 6379 + } + ] + ], + write: [ { host: "127.0.0.1", port: 6379 } ], - slaves: [ + read: [ { host: "127.0.0.1", - port: 6379 + port: 6380 + } + ], + sub: [ + { + host: "127.0.0.1", + port: 6380 } ] - }; - config.io_port = 10001; config.api_port = 11001; - module.exports = config; diff --git a/push-server/index.js b/push-server/index.js index 71a00ba..ef0e2b2 100644 --- a/push-server/index.js +++ b/push-server/index.js @@ -3,7 +3,10 @@ var program = require('commander'); program .version('0.0.3') .usage('[options] ') - .option('-d --debug', 'debug output') + .option('-d', 'debug output') + .option('-f', 'foreground') + .option('-v', 'verbose') + .option('-i', 'info') .option('-c, --count ', 'process count to start', parseInt) .parse(process.argv); @@ -19,12 +22,17 @@ if (cluster.isMaster) { for (var i = 0; i (Date.now() - 10 * 1000)) { - Logger.log("info", "too many call dropping %s", topic); + logger.info("too many call dropping %s", topic); call = false; } doPush(redis, topic, call, threshold, callback); @@ -26,7 +26,7 @@ ApiThreshold.prototype.checkPushDrop = function (topic, callback) { ApiThreshold.prototype.setThreshold = function (topic, threshold) { if (threshold == 0) { delete this.watchedTopics[topic]; - Logger.log("info", "remove ApiThreshold %s %s", topic, threshold); + logger.info("remove ApiThreshold %s %s", topic, threshold); } else { var fakeValues = []; var fakeTime = Date.now() - 20 * 1000; @@ -37,14 +37,13 @@ ApiThreshold.prototype.setThreshold = function (topic, threshold) { this.redis.lpush(key, fakeValues); this.redis.ltrim(key, 0, threshold - 1); this.watchedTopics[topic] = threshold; - Logger.log("info", "set ApiThreshold %s %s", topic, threshold); + logger.info("set ApiThreshold %s %s", topic, threshold); } } function doPush(redis, topic, call, threshold, callback) { if (call && threshold) { - Logger.log("info", 7); var key = "apiThreshold#callTimestamp#" + topic; redis.lpush(key, Date.now()); redis.ltrim(key, 0, threshold - 1); diff --git a/push-server/lib/api/restApi.js b/push-server/lib/api/restApi.js index 9457454..89efb13 100644 --- a/push-server/lib/api/restApi.js +++ b/push-server/lib/api/restApi.js @@ -1,22 +1,28 @@ module.exports = RestApi; +var restify = require('restify'); +var debug = require('debug')('RestApi'); +var logger = require('../log/index.js')('RestApi'); -function RestApi(io, stats, notificationService, port, uidStore, ttlService, redis, apiThreshold, apnService) { +function RestApi(io, topicOnline, stats, notificationService, port, ttlService, redis, apiThreshold, apnService, apiAuth) { - var restify = require('restify'); + if (!(this instanceof RestApi)) return new RestApi(io, topicOnline, stats, notificationService, port, ttlService, redis, apiThreshold, apnService, apiAuth); + + var self = this; + + this.apiAuth = apiAuth; var server = restify.createServer({ name: 'myapp', version: '1.0.0' }); - var Logger = require('../log/index.js')('RestApi'); - server.on('uncaughtException', function (req, res, route, err) { try { - Logger.log("error", "RestApi uncaughtException " + err.stack + " \n params: \n" + JSON.stringify(req.params)); + logger.error("RestApi uncaughtException " + err.stack + " \n params: \n" + JSON.stringify(req.params)); + res.statusCode = 500; res.send({code: "error", message: "exception " + err.stack}); } catch (err) { - Logger.log("error", "RestApi uncaughtException catch " + err.stack); + logger.error("RestApi uncaughtException catch " + err.stack); } }); @@ -33,8 +39,6 @@ function RestApi(io, stats, notificationService, port, uidStore, ttlService, red server.get(/^\/notification\/?.*/, staticConfig); - server.get(/^\/uid\/?.*/, staticConfig); - server.get(/^\/handleStatsBase\/?.*/, staticConfig); server.get(/^\/stats\/?.*/, staticConfig); @@ -44,20 +48,27 @@ function RestApi(io, stats, notificationService, port, uidStore, ttlService, red server.get("/", staticConfig); var handlePush = function (req, res, next) { + if (self.apiAuth && !self.apiAuth("/api/push", req, logger)) { + logger.error("push denied %j %j", req.params, req.headers); + res.statusCode = 400; + res.send({code: "error", message: 'not authorized'}); + return next(); + } var topic = req.params.topic; if (!topic) { + res.statusCode = 400; res.send({code: "error", message: 'topic is required'}); return next(); } var data = req.params.data; if (!data) { + res.statusCode = 400; res.send({code: "error", message: 'data is required'}); return next(); } var pushId = req.params.pushId; var pushAll = req.params.pushAll; - var uid = req.params.uid; - Logger.log("debug", "push %s", JSON.stringify(req.params)); + logger.info("push %j", req.params); var pushData = {topic: topic, data: data}; var timeToLive = parseInt(req.params.timeToLive); @@ -68,9 +79,9 @@ function RestApi(io, stats, notificationService, port, uidStore, ttlService, red ttlService.addPacketAndEmit(topic, 'push', timeToLive, pushData, io, false); res.send({code: "success"}); } else { + res.statusCode = 400; res.send({code: "error", message: "call threshold exceeded"}); } - }); return next(); } else { @@ -87,45 +98,32 @@ function RestApi(io, stats, notificationService, port, uidStore, ttlService, red return next(); } } else { - if (uid) { - if (typeof uid === 'string') { - uidStore.getPushIdByUid(uid, function (pushIds) { - pushIds.forEach(function (id) { - ttlService.addPacketAndEmit(id, 'push', timeToLive, pushData, io, true); - }); - res.send({code: "success"}); - return next(); - }); - } else { - uid.forEach(function (id, i) { - uidStore.getPushIdByUid(id, function (pushIds) { - pushIds.forEach(function (result) { - ttlService.addPacketAndEmit(result, 'push', timeToLive, pushData, io, true); - }); - }); - }); - res.send({code: "success"}); - return next(); - } - } + res.statusCode = 400; + res.send({code: "error", message: "pushId is required"}); + return next(); } } }; - var handleNotification = function (req, res, next) { + if (self.apiAuth && !self.apiAuth("/api/notification", req, logger)) { + logger.error("notification denied %j %j", req.params, req.headers); + res.statusCode = 400; + res.send({code: "error", message: 'not authorized'}); + return next(); + } var notification = JSON.parse(req.params.notification); if (!notification) { + res.statusCode = 400; res.send({code: "error", message: 'notification is required'}); return next(); } var pushId = req.params.pushId; - var uid = req.params.uid; var pushAll = req.params.pushAll; var timeToLive = parseInt(req.params.timeToLive); - Logger.log("debug", "notification %s", JSON.stringify(req.params)); + logger.info("notification ", req.params); if (pushAll === 'true') { notificationService.sendAll(notification, timeToLive, io); @@ -143,20 +141,9 @@ function RestApi(io, stats, notificationService, port, uidStore, ttlService, red res.send({code: "success"}); return next(); } else { - if (uid) { - var uids; - if (typeof uid === 'string') { - uids = [uid]; - } else { - uids = uid; - } - uids.forEach(function (uid, i) { - uidStore.getPushIdByUid(uid, function (pushIds) { - notificationService.sendByPushIds(pushIds, timeToLive, notification, io); - }); - }); - res.send({code: "success"}); - } + res.statusCode = 400; + res.send({code: "error", message: "pushId is required"}); + return next(); } } }; @@ -176,17 +163,9 @@ function RestApi(io, stats, notificationService, port, uidStore, ttlService, red return next(); }; - var handleAddPushIdToUid = function (req, res, next) { - var uid = req.params.uid; - var pushId = req.params.pushId; - uidStore.addUid(pushId, uid, 3600 * 1000) - res.send({code: "success"}); - return next(); - }; - var handleQueryDataKeys = function (req, res, next) { stats.getQueryDataKeys(function (result) { - Logger.log('debug', "getQueryDataKeys result: " + result) + logger.debug("getQueryDataKeys result: " + result) res.send({"result": result}); }); return next(); @@ -206,21 +185,22 @@ function RestApi(io, stats, notificationService, port, uidStore, ttlService, red server.post('/api/push', handlePush); server.get('/api/notification', handleNotification); server.post('/api/notification', handleNotification); - server.get('/api/addPushIdToUid', handleAddPushIdToUid); - server.post('/api/addPushIdToUid', handleAddPushIdToUid); server.get('api/state/getQueryDataKeys', handleQueryDataKeys) server.get('/api/topicOnline', function (req, res, next) { + if (!topicOnline) { + res.statusCode = 400; + res.send({code: 'error', message: 'topicOnline not configured'}); + return next(); + } var topic = req.params.topic; - var key = "stats#topicOnline#" + topic; - redis.del(key, function () { - redis.publish("adminCommand", JSON.stringify({command: "topicOnline", topic: topic})); - setTimeout(function () { - redis.get(key, function (err, result) { - result = result || "0"; - res.send({topic: topic, online: result.toString()}); - }); - }, 3000); + if (!topic) { + res.statusCode = 400; + res.send({code: 'error', message: 'topic is required'}) + return next(); + } + topicOnline.getTopicOnline(topic, function (result) { + res.send({count: result, topic: req.params.topic}); }); return next(); }); @@ -331,7 +311,7 @@ function RestApi(io, stats, notificationService, port, uidStore, ttlService, red }); server.listen(port, function () { - Logger.log('debug', '%s listening at %s', server.name, server.url); + logger.debug('%s listening at %s', server.name, server.url); }); } diff --git a/push-server/lib/log/index.js b/push-server/lib/log/index.js index ee96998..f933437 100644 --- a/push-server/lib/log/index.js +++ b/push-server/lib/log/index.js @@ -1,70 +1,101 @@ -var loggerSingleton; - var winston = require('winston-levelonly'); var fs = require('fs'); -var Logger = function Logger(index, dir) { - console.log("new singleton"); - var dir = 'log'; - var workerId = 1; +var dir = 'log'; +var workerId = 1; +var transports = []; +var formatter = function (options) { + return options.timestamp() + " " + 'work:' + workerId + ' ' + options.level.substring(0, 1).toUpperCase() + '/' + (undefined !== options.message ? options.message : ''); +} + +var logger; - this.getLogger = function (tag, index, logDir) { - var fileTag = tag; - if(index){ - workerId = index; - } - if(logDir){ - dir = logDir; - if (!fs.existsSync(dir)) { - fs.mkdirSync(dir); - } - return; +function setArgs(args) { + if (args.workId) { + workerId = args.workId; + } + if (args.dir) { + dir = args.dir; + } + if (!fs.existsSync(dir)) { + fs.mkdirSync(dir); + } + + if (args.count >= 10) { + if (workerId < 10) { + workerId = '0' + workerId; } - var opts = { - name: 'error', - json: false, - level: 'error', - datePattern: 'yyyy-MM-dd_error.log', - filename: dir + "/" + "log", - timestamp: function () { - return new Date().toLocaleString(); - }, - formatter: function (options) { - return options.timestamp() + " " + options.level.toUpperCase() + ' ' + ' ' + 'instance:' + workerId + ' ' - + fileTag + ' ' + (undefined !== options.message ? options.message : ''); - } - }; - var logger = new (winston.Logger)({ - transports: [ - new (winston.transports.Console)({ - level: 'debug', - levelOnly: false,//if true, will only log the specified level, if false will log from the specified level and above - timestamp: function () { - return new Date().toLocaleString(); - }, - formatter: function (options) { - return options.timestamp() + " " + options.level.toUpperCase() + ' ' + ' ' + 'instance:' + workerId + ' ' - + fileTag + ' ' + (undefined !== options.message ? options.message : ''); - } - }) - ] - }); - logger.add(winston.transports.DailyRotateFile, opts); - - opts.name = 'info'; - opts.level = 'info'; - opts.filename = dir + "/" + "log"; - opts.datePattern = 'yyyy-MM-dd_info.log'; - logger.add(winston.transports.DailyRotateFile, opts); - return logger; + } + + var level; + if (args.debug) { + level = 'debug'; + } else if (args.verbose) { + level = 'verbose'; + } else { + level = 'info'; + } + + var opts = { + name: 'error', + json: false, + level: 'error', + datePattern: 'yyyy-MM-dd.log', + filename: dir + "/" + "error_", + timestamp: function () { + return new Date().toLocaleString(); + }, + formatter: formatter }; + + transports.push(new (winston.transports.DailyRotateFile)(opts)); + + opts.name = level; + opts.level = level; + opts.filename = dir + "/" + level + "_"; + transports.push(new (winston.transports.DailyRotateFile)(opts)) + + if (args.foreground) { + opts.name = 'console'; + opts.levelOnly = false; + delete opts.filename; + delete opts.datePattern; + opts.level = level; + transports.push(new (winston.transports.Console)(opts)); + } + + logger = new (winston.Logger)({ + transports: transports + }); + } -Logger.getInstance = function () { - if (!loggerSingleton) { - loggerSingleton = new Logger(); +var LogProxy = function (logger, tag) { + this.logger = logger; + this.tag = tag; +}; + +var meta = {}; + +['debug', 'verbose', 'info', 'error'].forEach(function (command) { + + LogProxy.prototype[command] = function (key, arg, callback) { + if (this.logger) { + arguments[0] = this.tag + ' ' + arguments[0]; + var mainArguments = Array.prototype.slice.call(arguments); + mainArguments.push(meta); + this.logger[command].apply(this, mainArguments); + } + } + +}); + +var Logger = function Logger(tag) { + if ((typeof tag) == 'string') { + return new LogProxy(logger, tag); + } else { + setArgs(tag); } - return loggerSingleton; }; -module.exports = Logger.getInstance().getLogger; \ No newline at end of file +module.exports = Logger; \ No newline at end of file diff --git a/push-server/lib/push-server.js b/push-server/lib/push-server.js index c9c04f4..a76f0fb 100644 --- a/push-server/lib/push-server.js +++ b/push-server/lib/push-server.js @@ -1,6 +1,8 @@ module.exports = PushServer; function PushServer(config) { + if (!(this instanceof PushServer)) return new PushServer(config); + var self = this; console.log("config " + JSON.stringify(config)); var instance = config.instance || 1; console.log("starting instance #" + instance); @@ -25,21 +27,23 @@ function PushServer(config) { io.adapter(socketIoRedis); var packetService = require('./service/packetService.js')(cluster, cluster); - var uidStore = require('./redis/uidStore.js')(cluster); var TtlService = require('./service/ttlService.js'); var ttlService = new TtlService(cluster); var notificationService = require('./service/notificationService.js')(config.apns, cluster, ttlService); var ProxyServer = require('./server/proxyServer.js'); - var proxyServer = new ProxyServer(io, stats, packetService, notificationService, uidStore, ttlService); + var proxyServer = new ProxyServer(io, stats, packetService, notificationService, ttlService); var ApiThreshold = require('./api/apiThreshold.js'); var apiThreshold = new ApiThreshold(cluster); var AdminCommand = require('./server/adminCommand.js'); var adminCommand = new AdminCommand(cluster, stats, packetService, proxyServer, apiThreshold); - + var topicOnline; + if(config.topicOnlineFilter) { + topicOnline = require('./stats/topicOnline.js')(cluster, io, stats.id, config.topicOnlineFilter); + } if (apiPort) { var apnService = require('./service/apnService.js')(config.apns, config.apnsSliceServers, cluster, stats); notificationService.apnService = apnService; - var restApi = require('./api/restApi.js')(io, stats, notificationService, apiPort, uidStore, ttlService, cluster, apiThreshold, apnService); + self.restApi = require('./api/restApi.js')(io, topicOnline, stats, notificationService, apiPort, ttlService, cluster, apiThreshold, apnService, config.apiAuth); } }); } diff --git a/push-server/lib/redis/redisAdapter.js b/push-server/lib/redis/redisAdapter.js index 1cb7740..4982d13 100644 --- a/push-server/lib/redis/redisAdapter.js +++ b/push-server/lib/redis/redisAdapter.js @@ -6,7 +6,7 @@ var uid2 = require('uid2'); var redis = require('redis').createClient; var msgpack = require('msgpack-js'); var Adapter = require('socket.io-adapter'); -var Logger = require('../log/index.js')('RedisAdapter'); +var logger = require('../log/index.js')('RedisAdapter'); var async = require('async'); @@ -62,6 +62,7 @@ function adapter(uri, opts, stats) { */ function Redis(nsp) { + Adapter.call(this, nsp); this.uid = uid; @@ -88,14 +89,22 @@ function adapter(uri, opts, stats) { */ Redis.prototype.onmessage = function (channel, msg) { - Logger.log("info", 'channel %s', channel.toString().startsWith(prefix)); + if (stats && stats.shouldDrop()) { return; } + + if (!channel.toString().startsWith(prefix)) { + logger.debug('skip parse channel %s', prefix); + } + var args = msgpack.decode(msg); var packet; - if (uid == args.shift()) return Logger.log("info", 'ignore same uid'); + if (uid == args.shift()) { + logger.verbose('ignore same uid'); + return ; + } packet = args[0]; @@ -104,7 +113,8 @@ function adapter(uri, opts, stats) { } if (!packet || packet.nsp != this.nsp.name) { - return Logger.log("info", 'ignore different namespace'); + logger.verbose('ignore different namespace'); + return; } args.push(true); @@ -148,21 +158,21 @@ function adapter(uri, opts, stats) { Redis.prototype.add = function (id, room, fn) { var self = this; - Logger.log("info", 'adding %s to %s', id, room); + logger.verbose('adding %s to %s', id, room); var needRedisSub = this.rooms.hasOwnProperty(room) && this.rooms[room] Adapter.prototype.add.call(this, id, room); var channel = prefix + '#' + this.nsp.name + '#' + room + '#'; if (id == room) { - Logger.log("info", "skip add to id %s", room); + logger.verbose("verbose", "skip add to id %s", room); return; } if (needRedisSub) { - Loggerlog("info", "skip re-subscribe to room %s", room); + logger.verbose("verbose", "skip re-subscribe to room %s", room); return; } sub.subscribe(channel, function (err) { if (err) { - Logger.log("info", 'subscribe error %s', channel); + logger.verbose("error", 'subscribe error %s', channel); self.emit('error', err); if (fn) fn(err); return; @@ -181,7 +191,7 @@ function adapter(uri, opts, stats) { */ Redis.prototype.del = function (id, room, fn) { - Logger.log("info", 'removing %s from %s', id, room); + logger.verbose('removing %s from %s', id, room); var self = this; var hasRoom = this.rooms.hasOwnProperty(room); Adapter.prototype.del.call(this, id, room); @@ -189,7 +199,7 @@ function adapter(uri, opts, stats) { if (hasRoom && !this.rooms[room]) { var channel = prefix + '#' + this.nsp.name + '#' + room + '#'; - Logger.log("info", 'unsubscribing %s', channel); + logger.verbose('unsubscribing %s', channel); sub.unsubscribe(channel, function (err) { if (err) { self.emit('error', err); @@ -212,7 +222,7 @@ function adapter(uri, opts, stats) { */ Redis.prototype.delAll = function (id, fn) { - Logger.log("info", 'removing %s from all rooms', id); + logger.debug('removing %s from all rooms', id); var self = this; var rooms = this.sids[id]; diff --git a/push-server/lib/redis/sentinel.js b/push-server/lib/redis/sentinel.js index 3b6a445..3d781da 100644 --- a/push-server/lib/redis/sentinel.js +++ b/push-server/lib/redis/sentinel.js @@ -1,7 +1,7 @@ module.exports = Sentinel; var redis = require('redis'); -var Logger = require('../log/index.js')('Sentinel'); +var logger = require('../log/index.js')('Sentinel'); function Sentinel(sentinelAddrs, masterNames, ipMap, completeCallback, masterChangeCallback) { var masters = []; @@ -18,7 +18,7 @@ function Sentinel(sentinelAddrs, masterNames, ipMap, completeCallback, masterCha connect_timeout: 10000000000000000 }); client.on("error", function (err) { - Logger.log('error', "sentinel connect Error %s:%s %s", addr.host, addr.port, err); + logger.error("sentinel connect Error %s:%s %s", addr.host, addr.port, err); }); masterNames.forEach(function (masterName, i) { @@ -27,7 +27,7 @@ function Sentinel(sentinelAddrs, masterNames, ipMap, completeCallback, masterCha }; client.send_command("SENTINEL", ['get-master-addr-by-name', masterName], function (err, replies) { if (replies && outerThis.completeCallback) { - Logger.log('info', "get-master-addr-by-name %s %j", masterName, replies); + logger.info("get-master-addr-by-name %s %j", masterName, replies); var allQueried = true; masters.forEach(function (master) { if (master.name == masterName) { @@ -38,7 +38,7 @@ function Sentinel(sentinelAddrs, masterNames, ipMap, completeCallback, masterCha } }); if (allQueried) { - Logger.log('info', "masters all queried %j", masters); + logger.info("masters all queried %j", masters); outerThis.completeCallback(); outerThis.completeCallback = null; } @@ -55,7 +55,7 @@ function Sentinel(sentinelAddrs, masterNames, ipMap, completeCallback, masterCha connect_timeout: 10000000000000000 }); subClient.on("error", function (err) { - Logger.log('error', "sentinel subscribe Error %s:%s %s", addr.host, addr.port, err); + logger.error("sentinel subscribe Error %s:%s %s", addr.host, addr.port, err); }); subClient.on("message", function (channel, message) { @@ -64,12 +64,12 @@ function Sentinel(sentinelAddrs, masterNames, ipMap, completeCallback, masterCha var name = lines[0]; var host = lines[3].toString(); var port = parseInt(lines[4].toString()); - Logger.log('info', "+switch-master %j %j \n", lines, masters, name, host, port); + logger.info("+switch-master %j %j \n", lines, masters, name, host, port); masters.forEach(function (master, i) { if (master && master.name == name) { master.host = getIp(host); master.port = port; - Logger.log('info', "switch master callback %j", master); + logger.info("switch master callback %j", master); masterChangeCallback(master, i); return; } @@ -82,7 +82,7 @@ function Sentinel(sentinelAddrs, masterNames, ipMap, completeCallback, masterCha function getIp(fromSentinel) { if (ipMap && ipMap[fromSentinel]) { - Logger.log('info', 'getIp %s -> %s', fromSentinel, ipMap[fromSentinel]); + logger.info('getIp %s -> %s', fromSentinel, ipMap[fromSentinel]); return ipMap[fromSentinel]; } else { return fromSentinel; diff --git a/push-server/lib/redis/simpleRedisHashCluster.js b/push-server/lib/redis/simpleRedisHashCluster.js index a3392b8..af259b9 100644 --- a/push-server/lib/redis/simpleRedisHashCluster.js +++ b/push-server/lib/redis/simpleRedisHashCluster.js @@ -3,87 +3,32 @@ module.exports = SimpleRedisHashCluster; var commands = require('redis-commands'); var redis = require('redis'); var util = require("../util/util.js"); -var Logger = require('../log/index.js')('SimpleRedisHashCluster'); +var logger = require('../log/index.js')('SimpleRedisHashCluster'); + function SimpleRedisHashCluster(config, completeCallback) { - this.masters = []; - this.subSlaves = []; - this.readSlaves = []; this.messageCallbacks = []; - var outerThis = this; - var masterAddrs = config.masters; - var slaveAddrs = config.slaves; - if (!slaveAddrs) { - slaveAddrs = masterAddrs; + this.write = getClientsFromIpList(config.write); + this.read = getClientsFromIpList(config.read); + if (this.read.length == 0) { + logger.info("read slave not in config using write"); + this.read = this.write; } - slaveAddrs.forEach(function (addr) { - var subClient = redis.createClient({ - host: addr.host, - port: addr.port, - return_buffers: true, - retry_max_delay: 3000, - max_attempts: 0, - connect_timeout: 10000000000000000 - }); - subClient.on("error", function (err) { - Logger.error("redis slave connect Error %s:%s %s", addr.host, addr.port, err); - }); - subClient.on("message", function (channel, message) { - outerThis.messageCallbacks.forEach(function (callback) { - try { - callback(channel, message); - } catch (err) { - } - }); + this.sub = getClientsFromIpList(config.sub, this); + this.pubs = []; + var self = this; + if (config.pubs) { + config.pubs.forEach(function (pub) { + self.pubs.push(getClientsFromIpList(pub)); }); - outerThis.subSlaves.push(subClient) - var readClient = redis.createClient({ - host: addr.host, - port: addr.port, - return_buffers: true, - retry_max_delay: 3000, - max_attempts: 0, - connect_timeout: 10000000000000000 - }); - readClient.on("error", function (err) { - Logger.error("redis slave connect Error %s:%s %s", addr.host, addr.port, err); - }); - outerThis.readSlaves.push(readClient); - }); + } + completeCallback(this); +} - if (config.sentinels) { - Logger.log('info', 'use sentinels %j', config.sentinels) - var Sentinel = require('./sentinel.js'); - var sentinel = new Sentinel(config.sentinels, config.sentinelMasters, config.ipMap, function () { - sentinel.masters.forEach(function (addr) { - var client = redis.createClient({ - host: addr.host, - port: addr.port, - return_buffers: true, - retry_max_delay: 3000, - max_attempts: 0, - connect_timeout: 10000000000000000 - }); - client.on("error", function (err) { - Logger.error("redis master connect Error %s", err); - }); - outerThis.masters.push(client); - }); - var defaultPubAddr = util.getByHash(sentinel.masters, "packetProxy#default"); - Logger.log('debug', "packetProxy#default " + defaultPubAddr.host + ":" + defaultPubAddr.port); - completeCallback(outerThis); - }, function (newMaster, i) { - var master = outerThis.masters[i]; - Logger.log('info', 'current master %j', master.connection_options); - if (master.connection_options.port != newMaster.port || master.connection_options.host != newMaster.host) { - Logger.log('info', "switch master %j", newMaster); - master.connection_options.port = newMaster.port; - master.connection_options.host = newMaster.host; - } - }); - } else { - Logger.info('use masters %s', JSON.stringify(masterAddrs)); - masterAddrs.forEach(function (addr) { +function getClientsFromIpList(addrs, subscribe) { + var clients = []; + if (addrs) { + addrs.forEach(function (addr) { var client = redis.createClient({ host: addr.host, port: addr.port, @@ -93,46 +38,49 @@ function SimpleRedisHashCluster(config, completeCallback) { connect_timeout: 10000000000000000 }); client.on("error", function (err) { - Logger.error("redis master %s", err); + logger.error("redis error %s", err); }); - outerThis.masters.push(client); + if (subscribe) { + client.on("message", function (channel, message) { + subscribe.messageCallbacks.forEach(function (callback) { + try { + callback(channel, message); + } catch (err) { + logger.error("redis message error %s", err); + } + }); + }); + } + clients.push(client); }); - var defaultPubAddr = util.getByHash(masterAddrs, "packetProxy#default"); - Logger.log('debug', "packetProxy#default " + defaultPubAddr.host + ":" + defaultPubAddr.port); - completeCallback(outerThis); } + return clients; } + commands.list.forEach(function (command) { SimpleRedisHashCluster.prototype[command.toUpperCase()] = SimpleRedisHashCluster.prototype[command] = function (key, arg, callback) { - if (Array.isArray(key)) { - Logger.log('debug', "multiple key not supported "); - throw "multiple key not supported"; - } - var client; - if (this.masters.length == 1) { - client = this.masters[0]; - } else { - client = util.getByHash(this.masters, key); - } + var client = util.getByHash(this.write, key); handleCommand(command, arguments, key, arg, callback, client); } }); +['publish'].forEach(function (command) { + + SimpleRedisHashCluster.prototype[command.toUpperCase()] = SimpleRedisHashCluster.prototype[command] = function (key, arg, callback) { + this.pubs.forEach(function (pub) { + var client = util.getByHash(pub, key); + handleCommand(command, arguments, key, arg, callback, client); + }); + } + +}); + ['subscribe', 'unsubscribe'].forEach(function (command) { SimpleRedisHashCluster.prototype[command.toUpperCase()] = SimpleRedisHashCluster.prototype[command] = function (key, arg, callback) { - if (Array.isArray(key)) { - Logger.log('debug', "multiple key not supported "); - throw "multiple key not supported"; - } - var client; - if (this.subSlaves.length == 1) { - client = this.subSlaves[0]; - } else { - client = util.getByHash(this.subSlaves, key); - } + var client = util.getByHash(this.sub, key); handleCommand(command, arguments, key, arg, callback, client); } @@ -141,22 +89,17 @@ commands.list.forEach(function (command) { ['get', 'hkeys', 'hgetall', 'pttl', 'lrange'].forEach(function (command) { SimpleRedisHashCluster.prototype[command.toUpperCase()] = SimpleRedisHashCluster.prototype[command] = function (key, arg, callback) { - if (Array.isArray(key)) { - Logger.log('debug', "multiple key not supported "); - throw "multiple key not supported"; - } - var client; - if (this.readSlaves.length == 1) { - client = this.readSlaves[0]; - } else { - client = util.getByHash(this.readSlaves, key); - } + var client = util.getByHash(this.read, key); handleCommand(command, arguments, key, arg, callback, client); } }); function handleCommand(command, callArguments, key, arg, callback, client) { + if (!client) { + logger.error("handleCommand error ", command, key); + return; + } if (Array.isArray(arg)) { arg = [key].concat(arg); @@ -184,7 +127,7 @@ SimpleRedisHashCluster.prototype.on = function (message, callback) { this.messageCallbacks.push(callback); } else { var err = "on " + message + " not supported"; - Logger.error(error); + logger.error(error); throw err; } } diff --git a/push-server/lib/redis/uidStore.js b/push-server/lib/redis/uidStore.js deleted file mode 100644 index 861b599..0000000 --- a/push-server/lib/redis/uidStore.js +++ /dev/null @@ -1,51 +0,0 @@ -module.exports = UidStore; -var Logger = require('../log/index.js')('UidStore'); - -function UidStore(redis, subClient) { - if (!(this instanceof UidStore)) return new UidStore(redis, subClient); - this.redis = redis; -} - -UidStore.prototype.addUid = function(pushId, uid, timeToLive) { - Logger.info("addUid pushId %s %s", uid, pushId); - var key = "pushIdToUid#" + pushId; - var ourThis = this; - this.getUidByPushId(pushId, function(oldUid){ - if(oldUid) { - Logger.log('info', "remove %s from old uid %s", pushId, oldUid); - ourThis.redis.hdel("uidToPushId#" + oldUid, pushId); - } - ourThis.redis.set(key, uid); - if(timeToLive){ - ourThis.redis.expire(key, timeToLive); - } - ourThis.redis.hset("uidToPushId#" + uid, pushId , Date.now()); - }); -}; - -UidStore.prototype.removePushId = function (pushId) { - Logger.info("removePushId pushId %s %s", uid, pushId); - var key = "pushIdToUid#" + pushId; - var ourThis = this; - this.redis.get(key, function (err, oldUid) { - if (oldUid) { - Logger.log('info', "remove %s from old uid %s", pushId, oldUid); - ourThis.redis.hdel("uidToPushId#" + uid, pushId); - ourThis.redis.del(key); - } - }); -}; - -UidStore.prototype.getUidByPushId = function (pushId, callback) { - this.redis.get("pushIdToUid#" + pushId, function (err, uid) { - // reply is null when the key is missing - Logger.info("getUidByPushId %s %s", pushId, uid); - callback(uid); - }); -}; - -UidStore.prototype.getPushIdByUid = function (uid, callback) { - this.redis.hkeys("uidToPushId#" + uid, function (err, replies) { - callback(replies); - }); -}; diff --git a/push-server/lib/server/adminCommand.js b/push-server/lib/server/adminCommand.js index 60dc109..35605d3 100644 --- a/push-server/lib/server/adminCommand.js +++ b/push-server/lib/server/adminCommand.js @@ -1,14 +1,14 @@ module.exports = AdminCommand; -var Logger = require('../log/index.js')('AdminCommand'); +var logger = require('../log/index.js')('AdminCommand'); function AdminCommand(redis, stats, packetSevice, proxyServer, apiThrehold) { redis.on("message", function (channel, message) { if (channel == "adminCommand") { var command = JSON.parse(message); - Logger.log('debug', 'adminCommand %j', command); + logger.debug( 'adminCommand %j', command); if (command.command == 'packetDropThreshold') { - Logger.log('debug', 'setting packetDropThreshold %d', stats.packetDropThreshold); + logger.debug( 'setting packetDropThreshold %d', stats.packetDropThreshold); stats.packetDropThreshold = command.packetDropThreshold; } else if (command.command == 'stopPacketService') { packetSevice.stopped = true; diff --git a/push-server/lib/server/proxyServer.js b/push-server/lib/server/proxyServer.js index 52400bd..7f4a2d1 100644 --- a/push-server/lib/server/proxyServer.js +++ b/push-server/lib/server/proxyServer.js @@ -1,8 +1,8 @@ module.exports = ProxyServer; -var Logger = require('../log/index.js')('ProxyServer'); +var logger = require('../log/index.js')('ProxyServer'); var http = require('http'); -function ProxyServer(io, stats, packetService, notificationService, uidStore, ttlService) { +function ProxyServer(io, stats, packetService, notificationService, ttlService) { this.io = io; io.on('connection', function (socket) { @@ -13,7 +13,7 @@ function ProxyServer(io, stats, packetService, notificationService, uidStore, tt stats.removeSession(); stats.removePlatformSession(socket.platform); if (socket.pushId) { - Logger.log('debug', "publishDisconnect %s", socket.pushId); + logger.debug( "publishDisconnect %s", socket.pushId); packetService.publishDisconnect(socket); } }); @@ -29,7 +29,7 @@ function ProxyServer(io, stats, packetService, notificationService, uidStore, tt socket.on('pushId', function (data) { if (data.id && data.id.length >= 10) { - Logger.log('debug', "on pushId %j", data); + logger.debug( "on pushId %j", data); if (data.platform) { socket.platform = data.platform.toLowerCase(); } @@ -53,19 +53,13 @@ function ProxyServer(io, stats, packetService, notificationService, uidStore, tt ttlService.getPackets(data.id, data.lastUnicastId, socket); } - uidStore.getUidByPushId(data.id, function (uid) { - var reply = {id: data.id}; - if (uid) { - reply.uid = uid; - socket.uid = uid; - } - socket.pushId = data.id; - packetService.publishConnect(socket); - socket.join(data.id); - socket.emit('pushId', reply); - Logger.log('debug', 'join room socket.id %s ,pushId %s', socket.id, socket.pushId); - ttlService.onPushId(socket); - }) + var reply = {id: data.id}; + socket.pushId = data.id; + packetService.publishConnect(socket); + socket.join(data.id); + socket.emit('pushId', reply); + logger.debug( 'join room socket.id %s ,pushId %s', socket.id, socket.pushId); + ttlService.onPushId(socket); } }); @@ -77,13 +71,13 @@ function ProxyServer(io, stats, packetService, notificationService, uidStore, tt socket.on('unsubscribeTopic', function (data) { - Logger.log('debug', "on unsubscribeTopic %j", data); + logger.debug( "on unsubscribeTopic %j", data); var topic = data.topic; socket.leave(topic); }); socket.on('apnToken', function (data) { - Logger.log('debug', "on apnToken %j", data); + logger.debug( "on apnToken %j", data); var pushId = data.pushId; var apnToken = data.apnToken; notificationService.setApnToken(pushId, apnToken, data.bundleId); @@ -91,9 +85,6 @@ function ProxyServer(io, stats, packetService, notificationService, uidStore, tt socket.on('packetProxy', function (data) { data.pushId = socket.pushId; - if (socket.uid) { - data.uid = socket.uid; - } packetService.publishPacket(data); }); @@ -107,6 +98,6 @@ function ProxyServer(io, stats, packetService, notificationService, uidStore, tt ProxyServer.prototype.getTopicOnline = function (topic) { var online = this.io.nsps['/'].adapter.rooms[topic].length; - Logger.log('debug', "on topic online %s %d", topic, online); + logger.debug( "on topic online %s %d", topic, online); return online; } \ No newline at end of file diff --git a/push-server/lib/service/apnService.js b/push-server/lib/service/apnService.js index b5b8ea5..f1671e0 100644 --- a/push-server/lib/service/apnService.js +++ b/push-server/lib/service/apnService.js @@ -1,6 +1,6 @@ module.exports = ApnService; -var Logger = require('../log/index.js')('ApnService'); +var logger = require('../log/index.js')('ApnService'); var util = require('../util/util.js'); var apn = require('apn'); @@ -23,18 +23,18 @@ function ApnService(apnConfigs, sliceServers, redis, stats) { apnConfig.errorCallback = function (errorCode, notification, device) { if (device && device.token) { var id = device.token.toString('hex'); - Logger.error("apn errorCallback errorCode %d %s", errorCode, id); + logger.error("apn errorCallback errorCode %d %s", errorCode, id); stats.addApnError(1, errorCode); redis.hdel("apnTokens#" + apnConfig.bundleId, id); redis.get("apnTokenToPushId#" + id, function (err, oldPushId) { - Logger.error("apn errorCallback pushId %s", oldPushId); + logger.error("apn errorCallback pushId %s", oldPushId); if (oldPushId) { redis.del("pushIdToApnData#" + oldPushId); redis.del("apnTokenToPushId#" + id); } }); } else { - Logger.error("apn errorCallback no token %s %j", errorCode, device); + logger.error("apn errorCallback no token %s %j", errorCode, device); } } var connection = apn.Connection(apnConfig); @@ -43,12 +43,12 @@ function ApnService(apnConfigs, sliceServers, redis, stats) { connection.on("transmitted", function () { stats.addApnSuccess(1); }); - Logger.info("apnConnections init for %s maxConnections %s", apnConfig.bundleId, apnConfig.maxConnections); + logger.info("apnConnections init for %s maxConnections %s", apnConfig.bundleId, apnConfig.maxConnections); }); this.bundleIds = Object.keys(this.apnConnections); this.defaultBundleId = this.bundleIds[0]; - Logger.info("defaultBundleId %s", this.defaultBundleId); + logger.info("defaultBundleId %s", this.defaultBundleId); } @@ -59,7 +59,7 @@ ApnService.prototype.sendOne = function (apnData, notification, timeToLive) { this.stats.addApnTotal(1); var note = toApnNotification(notification, timeToLive); apnConnection.pushNotification(note, apnData.apnToken); - Logger.info("send to notification to ios %s %s", apnData.bundleId, apnData.apnToken); + logger.info("send to notification to ios %s %s", apnData.bundleId, apnData.apnToken); } }; @@ -85,7 +85,7 @@ ApnService.prototype.sendToApn = function (tokenToTime, bundleId, note) { var token = tokenToTime[i]; var time = tokenToTime[i + 1]; if (timestamp - time > apnTokenTTL * 1000) { - Logger.info("delete outdated apnToken %s", token); + logger.info("delete outdated apnToken %s", token); this.redis.hdel("apnTokens#" + bundleId, token); } else { tokens.push(token.toString()); @@ -95,7 +95,7 @@ ApnService.prototype.sendToApn = function (tokenToTime, bundleId, note) { for (var token in tokenToTime) { var time = tokenToTime[token]; if (timestamp - time > apnTokenTTL * 1000) { - Logger.info("delete outdated apnToken %s", token); + logger.info("delete outdated apnToken %s", token); this.redis.hdel("apnTokens#" + bundleId, token); } else { tokens.push(token); @@ -103,7 +103,7 @@ ApnService.prototype.sendToApn = function (tokenToTime, bundleId, note) { } } if (tokens.length > 0) { - Logger.info("send apn %s", tokens); + logger.info("send apn %s", tokens); apnConnection.pushNotification(note, tokens); this.stats.addApnTotal(tokens.length); } @@ -130,7 +130,7 @@ ApnService.prototype.sendAll = function (notification, timeToLive) { .set('Accept', 'application/json') .end(function (err, res) { if (err || res.text != '{"code":"success"}') { - Logger.error("slicing error %s %s %s", pattern, apiUrl, res && res.text); + logger.error("slicing error %s %s %s", pattern, apiUrl, res && res.text); } }); }); diff --git a/push-server/lib/service/notificationService.js b/push-server/lib/service/notificationService.js index 7a27c78..0a49941 100644 --- a/push-server/lib/service/notificationService.js +++ b/push-server/lib/service/notificationService.js @@ -1,6 +1,6 @@ module.exports = NotificationService; -var Logger = require('../log/index.js')('NotificationService'); +var logger = require('../log/index.js')('NotificationService'); var util = require('../util/util.js'); var apn = require('apn'); var apnTokenTTL = 3600 * 24 * 7; @@ -17,7 +17,7 @@ function NotificationService(apnConfigs, redis, ttlService) { } }); - Logger.info("defaultBundleId %s", this.defaultBundleId); + logger.info("defaultBundleId %s", this.defaultBundleId); } NotificationService.prototype.setApnToken = function (pushId, apnToken, bundleId) { @@ -28,16 +28,16 @@ NotificationService.prototype.setApnToken = function (pushId, apnToken, bundleId try { var buffer = new Buffer(apnToken, 'hex'); } catch (err) { - Logger.info("invalid apnToken format %s", apnToken); + logger.info("invalid apnToken format %s", apnToken); return; } var apnData = JSON.stringify({bundleId: bundleId, apnToken: apnToken}); var outerThis = this; this.redis.get("apnTokenToPushId#" + apnToken, function (err, oldPushId) { - Logger.info("oldPushId %s", oldPushId); + logger.info("oldPushId %s", oldPushId); if (oldPushId && oldPushId != pushId) { outerThis.redis.del("pushIdToApnData#" + oldPushId); - Logger.info("remove old pushId to apnToken %s %s", oldPushId, apnData); + logger.info("remove old pushId to apnToken %s %s", oldPushId, apnData); } outerThis.redis.set("apnTokenToPushId#" + apnToken, pushId); outerThis.redis.set("pushIdToApnData#" + pushId, apnData); @@ -52,12 +52,12 @@ NotificationService.prototype.sendByPushIds = function (pushIds, timeToLive, not var outerThis = this; pushIds.forEach(function (pushId) { outerThis.redis.get("pushIdToApnData#" + pushId, function (err, reply) { - Logger.info("pushIdToApnData %s %s", pushId, JSON.stringify(reply)); + logger.info("pushIdToApnData %s %s", pushId, JSON.stringify(reply)); if (reply) { var apnData = JSON.parse(reply); outerThis.apnService.sendOne(apnData, notification, timeToLive); } else { - Logger.info("send notification to android %s", pushId); + logger.info("send notification to android %s", pushId); outerThis.ttlService.addPacketAndEmit(pushId, 'noti', timeToLive, notification, io, true); } }); diff --git a/push-server/lib/service/packetService.js b/push-server/lib/service/packetService.js index c56a216..c938670 100644 --- a/push-server/lib/service/packetService.js +++ b/push-server/lib/service/packetService.js @@ -1,6 +1,6 @@ module.exports = PacketService; -var Logger = require('../log/index.js')('PacketService'); +var logger = require('../log/index.js')('PacketService'); var randomstring = require("randomstring"); var pathToServer = {}; @@ -19,15 +19,15 @@ String.prototype.hashCode = function () { function PacketService(redis, subClient) { if (!(this instanceof PacketService)) return new PacketService(redis, subClient); this.redis = redis; - this.stopped = false; - subClient.on("message", function (channel, message) { - Logger.info("subscribe message " + channel + ": " + message) - if (channel == "packetServer") { - var handlerInfo = JSON.parse(message); - updatePathServer(handlerInfo); - } - }); - subClient.subscribe("packetServer"); + this.stopped = true; + //subClient.on("message", function (channel, message) { + // logger.info("subscribe message " + channel + ": " + message) + // if (channel == "packetServer") { + // var handlerInfo = JSON.parse(message); + // updatePathServer(handlerInfo); + // } + //}); + //subClient.subscribe("packetServer"); } function updatePathServer(handlerInfo) { @@ -46,13 +46,13 @@ function updatePathServer(handlerInfo) { updatedServers.push(server); found = true; } else if (timestamp - server.timestamp > 10000) { - Logger.info("server is dead %s", server.serverId); + logger.info("server is dead %s", server.serverId); } else { updatedServers.push(server); } } if (!found) { - Logger.info("new server is added %s", serverId); + logger.info("new server is added %s", serverId); updatedServers.push({serverId: serverId, timestamp: timestamp}); } pathToServer[path] = updatedServers; @@ -81,7 +81,7 @@ PacketService.prototype.publishPacket = function (data) { if (servers[idx]) { var serverId = servers[idx]["serverId"]; this.redis.publish("packetProxy#" + serverId, strData); - Logger.info("publishPacket %s %s", serverId, strData); + logger.info("publishPacket %s %s", serverId, strData); return; } } @@ -93,13 +93,13 @@ PacketService.prototype.publishDisconnect = function (socket) { if (this.stopped) { return; } - Logger.info("publishDisconnect pushId %s", socket.pushId); + logger.info("publishDisconnect pushId %s", socket.pushId); var outerThis = this; this.redis.get("pushIdSocketId#" + socket.pushId, function (err, lastSocketId) { // reply is null when the key is missing - Logger.info("pushIdSocketId redis %s %s %s", socket.id, lastSocketId, socket.pushId); + logger.info("pushIdSocketId redis %s %s %s", socket.id, lastSocketId, socket.pushId); if (lastSocketId == socket.id) { - Logger.info("publishDisconnect current socket disconnect %s", socket.id); + logger.info("publishDisconnect current socket disconnect %s", socket.id); outerThis.redis.del("pushIdSocketId#" + socket.pushId); var data = {pushId: socket.pushId, path: "/socketDisconnect"}; if (socket.uid) { @@ -114,15 +114,15 @@ PacketService.prototype.publishConnect = function (socket) { if (this.stopped) { return; } - Logger.info("publishConnect pushId %s", socket.pushId); + logger.debug("publishConnect pushId %s", socket.pushId); var outerThis = this; this.redis.get("pushIdSocketId#" + socket.pushId, function (err, lastSocketId) { // reply is null when the key is missing - Logger.info("publishConnect query redis %s", lastSocketId); + logger.verbose("publishConnect query redis %s", lastSocketId); if (lastSocketId) { - Logger.info("reconnect do not publish", lastSocketId); + logger.verbose("reconnect do not publish", lastSocketId); } else { - Logger.info("first connect publish", lastSocketId); + logger.verbose("first connect publish", lastSocketId); var data = {pushId: socket.pushId, path: "/socketConnect"}; if (socket.uid) { data.uid = socket.uid; diff --git a/push-server/lib/service/ttlService.js b/push-server/lib/service/ttlService.js index b3f520d..ce0a287 100644 --- a/push-server/lib/service/ttlService.js +++ b/push-server/lib/service/ttlService.js @@ -1,6 +1,6 @@ module.exports = TTLService; -var Logger = require('../log/index.js')('TTLService'); +var logger = require('../log/index.js')('TTLService'); var randomstring = require("randomstring"); function TTLService(redis) { @@ -16,7 +16,7 @@ var maxTllPacketPerTopic = -50; TTLService.prototype.addPacketAndEmit = function (topic, event, timeToLive, packet, io, unicast) { packet.id = randomstring.generate(12); if (timeToLive > 0) { - Logger.info("addPacket %s %s %s", topic, event, timeToLive); + logger.info("addPacket %s %s %s", topic, event, timeToLive); packet.ttl = ""; if (unicast) { packet.unicast = ""; @@ -27,7 +27,7 @@ TTLService.prototype.addPacketAndEmit = function (topic, event, timeToLive, pack data.event = event; var listKey = "ttl#packet#" + topic; redis.pttl(listKey, function (err, oldTtl) { - Logger.info("addPacket key %s , %d , %d", listKey, oldTtl, timeToLive); + logger.info("addPacket key %s , %d , %d", listKey, oldTtl, timeToLive); redis.rpush(listKey, JSON.stringify(data)); redis.ltrim(listKey, maxTllPacketPerTopic, -1); if (timeToLive > oldTtl) { @@ -51,15 +51,15 @@ TTLService.prototype.getPackets = function (topic, lastId, socket) { var now = Date.now(); if (jsonPacket.id == lastId) { lastFound = true; - Logger.info("lastFound %s %s", jsonPacket.id, lastId); + logger.info("lastFound %s %s", jsonPacket.id, lastId); } else if (lastFound == true && jsonPacket.timestampValid > now) { - Logger.info("call emitPacket %s %s", jsonPacket.id, lastId); + logger.info("call emitPacket %s %s", jsonPacket.id, lastId); emitPacket(socket, jsonPacket); } }); if (!lastFound) { - Logger.info('lastId %s not found send all packets', lastId); + logger.info('lastId %s not found send all packets', lastId); list.forEach(function (packet) { var jsonPacket = JSON.parse(packet); if (jsonPacket.timestampValid > now) { @@ -76,6 +76,6 @@ function emitPacket(socket, packet) { var event = packet.event; delete packet.event; delete packet.timestampValid; - Logger.info("emitPacket %s %j", event, packet); + logger.info("emitPacket %s %j", event, packet); socket.emit(event, packet); } \ No newline at end of file diff --git a/push-server/lib/stats/redisIncrBuffer.js b/push-server/lib/stats/redisIncrBuffer.js index ff6ff8f..1f4ce7b 100644 --- a/push-server/lib/stats/redisIncrBuffer.js +++ b/push-server/lib/stats/redisIncrBuffer.js @@ -1,7 +1,5 @@ module.exports = RedisIncrBuffer; -var Logger = require('../log/index.js')('RedisIncrBuffer'); - function RedisIncrBuffer(redis) { if (!(this instanceof RedisIncrBuffer)) return new RedisIncrBuffer(redis); this.redis = redis; @@ -19,13 +17,11 @@ RedisIncrBuffer.prototype.incrby = function (key, by) { RedisIncrBuffer.prototype.checkCommit = function () { var timestamp = Date.now(); if ((timestamp - this.timestamp) > this.commitThreshold) { - Logger.info("stats threshold committing"); for (var key in this.map) { this.redis.incrby(key, this.map[key]); var index = key.indexOf("#totalCount"); if(index != -1){ var str = key.substring(6, index); - Logger.info("checkCommit: " + str); //stats#request#/addDot#totalCount#1457344800000 this.redis.hset("queryDataKeys", str, Date.now()) } } diff --git a/push-server/lib/stats/stats.js b/push-server/lib/stats/stats.js index 80b5c28..1f124a4 100644 --- a/push-server/lib/stats/stats.js +++ b/push-server/lib/stats/stats.js @@ -1,6 +1,6 @@ module.exports = Stats; -var Logger = require('../log/index.js')('Stats'); +var logger = require('../log/index.js')('Stats'); var randomstring = require("randomstring"); function Stats(redis, port) { @@ -16,7 +16,7 @@ function Stats(redis, port) { if (fs.existsSync(ipPath)) { ip = fs.readFileSync(ipPath, "utf8").trim() + ":" + port; } - Logger.info("ip file %s %s", ipPath, ip); + logger.verbose("ip file %s %s", ipPath, ip); this.id = ip || randomstring.generate(32); var stats = this; setInterval(function () { @@ -35,7 +35,7 @@ function Stats(redis, port) { Stats.prototype.shouldDrop = function () { if (this.packetDropThreshold != 0 && this.packetAverage1 && this.packetAverage1 > this.packetDropThreshold) { - Logger.info('threshold exceeded dropping packet %d > %d', this.packetAverage1, this.packetDropThreshold); + logger.verbose('threshold exceeded dropping packet %d > %d', this.packetAverage1, this.packetDropThreshold); this.packetDrop++; return true; } else { @@ -97,7 +97,7 @@ Stats.prototype.addSession = function (socket, count) { var stats = this; socket.on('stats', function (data) { - Logger.info("on stats %s", JSON.stringify(data.requestStats)); + logger.verbose("on stats %s", JSON.stringify(data.requestStats)); var timestamp = Date.now(); var totalCount = 0; if (data.requestStats && data.requestStats.length) { @@ -128,7 +128,7 @@ Stats.prototype.incr = function (key, timestamp) { var hourKey = hourStrip(timestamp); key = key + "#" + hourKey; this.redisIncrBuffer.incrby(key, 1); - Logger.info("incr %s %s", key, hourKey); + logger.verbose("incr %s %s", key, hourKey); }; Stats.prototype.incrby = function (key, timestamp, by) { @@ -136,17 +136,17 @@ Stats.prototype.incrby = function (key, timestamp, by) { var hourKey = hourStrip(timestamp); key = key + "#" + hourKey; this.redisIncrBuffer.incrby(key, by); - Logger.info("incrby %s %s by %d ", key, hourKey, by); + logger.verbose("incrby %s %s by %d ", key, hourKey, by); } }; Stats.prototype.onNotificationReply = function (timestamp) { var latency = Date.now() - timestamp; - Logger.info('onNotificationReply %s', latency); + logger.verbose('onNotificationReply %s', latency); if (latency < 10000) { this.incr("stats#notification#successCount", timestamp); this.incrby("stats#notification#totalLatency", timestamp, latency); - Logger.info("onNotificationReply %d", latency); + logger.verbose("onNotificationReply %d", latency); } }; diff --git a/push-server/lib/stats/topicOnline.js b/push-server/lib/stats/topicOnline.js new file mode 100644 index 0000000..062d938 --- /dev/null +++ b/push-server/lib/stats/topicOnline.js @@ -0,0 +1,69 @@ +module.exports = topicOnline; + +var logger = require('../log/index.js')('topicOnline'); + +function filterTopic(topic, filterArray) { + if (!filterArray || !topic) { + return false; + } + for (var i = 0; i < filterArray.length; i++) { + if (topic.startsWith(filterArray[i])) { + return true; + } + } + return false; +} + +function topicOnline(redis, io, id, filterTopics) { + if (!(this instanceof topicOnline)) return new topicOnline(redis, io, id, filterTopics); + this.redis = redis; + this.io = io; + this.id = id; + this.filters = filterTopics; + this.interval = 10000; + this.timeValidWithIn = 20000; + var self = this; + setInterval(function () { + var result = self.io.nsps['/'].adapter.rooms; + for(key in result) { + if (result[key].length > 0 && filterTopic(key, self.filters)) { + var json = {length: result[key].length, time: Date.now()}; + logger.debug("writing topicOnline %s %j", key, json); + self.redis.hset("stats#topicOnline#" + key, self.id, JSON.stringify(json)); + } + }; + }, this.interval); +} + +topicOnline.prototype.writeTopicOnline = function (data) { + var self = this; + for(key in data) { + if (data[key].length > 0 && filterTopic(key, self.filters)) { + var json = {length: data[key].length, time: Date.now()}; + self.redis.hset("stats#topicOnline#" + key, self.id, JSON.stringify(json)); + } + }; +} + +topicOnline.prototype.getTopicOnline = function (topic, callback) { + var count = 0; + var self = this; + this.redis.hgetall("stats#topicOnline#" + topic, function (err, result) { + if (result) { + var delKey = []; + for(key in result) { + var data = JSON.parse(result[key]); + if ((data.time + self.timeValidWithIn) < Date.now()) { + delKey.push(key); + } else { + count = count + data.length; + } + }; + if (delKey.length > 0) { + self.redis.hdel("stats#topicOnline#" + topic, delKey); + } + } + console.log("result count: " + count); + callback(count); + }); +} \ No newline at end of file diff --git a/push-server/lib/util/util.js b/push-server/lib/util/util.js index e446d2f..1bc50c2 100644 --- a/push-server/lib/util/util.js +++ b/push-server/lib/util/util.js @@ -1,5 +1,11 @@ module.exports = { getByHash: function (array, key) { + if(!array || array.length == 0){ + return; + } + if(array.length == 1){ + return array[0]; + } var hash = 0; if (key.length == 0) return hash; for (var i = 0; i < key.length; i++) { diff --git a/push-server/package.json b/push-server/package.json index 22d9ea9..fb7ccae 100644 --- a/push-server/package.json +++ b/push-server/package.json @@ -1,6 +1,6 @@ { "name": "socket.io-push", - "version": "0.0.86", + "version": "0.0.99", "description": "socket.io-push server by xuduo", "main": "index.js", "bin": { @@ -34,5 +34,8 @@ "chai": "^3.5.0", "mocha": "2.3.4", "socket.io-client": "1.4.5" + }, + "scripts": { + "test": "find ./test -name '*.js' | xargs -n1 mocha" } } diff --git a/push-server/test/apiAuth.js b/push-server/test/apiAuth.js index a71670e..a0f77e5 100644 --- a/push-server/test/apiAuth.js +++ b/push-server/test/apiAuth.js @@ -4,12 +4,6 @@ var apiUrl = 'http://localhost:' + config.api_port; var chai = require('chai'); var expect = chai.expect; -var apiCheck = function (path, req) { - debug("req %j", req); - return true; -} - -var pushServer = require('../lib/push-server.js')(config); describe('api auth', function () { @@ -24,12 +18,22 @@ describe('api auth', function () { }) .set('Accept', 'application/json') .end(function (err, res) { - expect(res.text).to.be.equal('{"code":"success"}'); + expect(JSON.parse(res.text).code).to.be.equal("success"); done(); }); }); + var pushServer = require('../lib/push-server.js')(config); + + it('check should not pass', function (done) { + + var apiCheckDenyAll = function (path, req) { + return false; + } + + pushServer.restApi.apiAuth = apiCheckDenyAll; + request .post(apiUrl + '/api/push') .send({ @@ -40,11 +44,66 @@ describe('api auth', function () { }) .set('Accept', 'application/json') .end(function (err, res) { - expect(res.text).to.be.equal('{"code":"error"}'); + expect(JSON.parse(res.text).code).to.be.equal("error"); done(); }); }); + it('check ip', function (done) { + + var ipList = ['127.0.0.1', '127.0.0.2']; + var apiCheckIp = function (path, req, logger) { + var ip = req.headers['x-real-ip'] || req.connection.remoteAddress; + logger.debug("%s caller ip %s", path, ip); + if (req.params.pushAll == 'true') { + return ipList.indexOf(ip) != -1; + } else { + return true; + } + } + + pushServer.restApi.apiAuth = apiCheckIp; + + request + .post(apiUrl + '/api/push') + .send({ + pushId: '', + pushAll: 'true', + topic: 'message', + data: 'test' + }) + .set('Accept', 'application/json') + .end(function (err, res) { + expect(JSON.parse(res.text).code).to.be.equal("error"); + }); + + request + .post(apiUrl + '/api/push') + .send({ + pushId: 'test', + topic: 'message', + data: 'test' + }) + .set('Accept', 'application/json') + .end(function (err, res) { + expect(JSON.parse(res.text).code).to.be.equal("success"); + }); + + request + .post(apiUrl + '/api/push') + .send({ + pushId: '', + pushAll: 'true', + topic: 'message', + data: 'test' + }) + .set('X-Real-IP', '127.0.0.2') + .set('Accept', 'application/json') + .end(function (err, res) { + expect(JSON.parse(res.text).code).to.be.equal("success"); + done(); + }); + }); }); diff --git a/push-server/test/index.js b/push-server/test/index.js index 3183c16..852f530 100644 --- a/push-server/test/index.js +++ b/push-server/test/index.js @@ -1,13 +1,17 @@ var request = require('superagent'); var config = require('../config.js'); +var oldApiPort = config.api_port; +config.api_port = 0; var pushService = require('../lib/push-server.js')(config); -var apiUrl = 'http://localhost:' + config.api_port; var pushClient = require('../lib/push-client.js')('http://localhost:' + config.io_port, {transports: ['websocket', 'polling']}); +config.io_port = config.io_port + 1; +config.api_port = oldApiPort; +var apiService = require('../lib/push-server.js')(config); +var apiUrl = 'http://localhost:' + config.api_port; + -var chai = require('chai') - , spies = require('chai-spies'); +var chai = require('chai'); -chai.use(spies); var expect = chai.expect; describe('长连接Socket IO的测试', function () { @@ -23,25 +27,23 @@ describe('长连接Socket IO的测试', function () { var b = new Buffer('{ "message":"ok"}'); var data = b.toString('base64'); - var messageCallback = function(topic,data){ + var messageCallback = function (topic, data) { expect(topic).to.be.equal('message'); expect(data.message).to.be.equal('ok'); + done(); } - var spy = chai.spy(messageCallback); - pushClient.event.on('message',spy); + pushClient.event.on('message', messageCallback); request .post(apiUrl + '/api/push') .send({ pushId: '', pushAll: 'true', topic: 'message', - data:data + data: data }) .set('Accept', 'application/json') .end(function (err, res) { expect(res.text).to.be.equal('{"code":"success"}'); - expect(spy).to.have.been.called(); - done(); }); }); @@ -54,12 +56,12 @@ describe('长连接Socket IO的测试', function () { } var str = JSON.stringify(data); - var notificationCallback = function(data){ + var notificationCallback = function (data) { expect(data.android.title).to.be.equal(title); expect(data.android.message).to.be.equal(message); + done(); } - var spy = chai.spy(notificationCallback); - pushClient.event.on('notification',spy); + pushClient.event.on('notification', notificationCallback); //leave topic pushClient.unsubscribeTopic("message"); @@ -69,14 +71,12 @@ describe('长连接Socket IO的测试', function () { .send({ pushId: '', pushAll: 'true', - uid:'', - notification:str + uid: '', + notification: str }) .set('Accept', 'application/json') .end(function (err, res) { expect(res.text).to.be.equal('{"code":"success"}'); - expect(spy).to.have.been.called(); - done(); }); }); diff --git a/push-server/test/topicOnline.js b/push-server/test/topicOnline.js new file mode 100644 index 0000000..6da55a9 --- /dev/null +++ b/push-server/test/topicOnline.js @@ -0,0 +1,43 @@ +var config = require('../config.js'); + +var redis = require('redis').createClient(); +var io = require('socket.io'); +var topicOnline = require('../lib/stats/topicOnline.js')(redis, io, 'Ys7Gh2NwDY9Dqti92ZwxJh8ymQL4mmZ2 ', ['topic:', 'message']); +var topicOnline1 = require('../lib/stats/topicOnline.js')(redis, io, 'Ys7Gh2NwDY9Dqti92ZwxJh8ymQL4mmZ3 ', ['topic:', 'message']); + +var chai = require('chai'); +var expect = chai.expect; + +describe('api topicOnline', function () { + + var data = {"topic:Test1" : { length: 3}, "testTopic2" : { length:4}}; + + it('Test topicOnline', function (done) { + topicOnline.writeTopicOnline(data); + topicOnline1.writeTopicOnline(data); + setTimeout(function(){ + topicOnline.getTopicOnline('topic:Test1', function(result){ + expect(result).to.be.equal(6); + topicOnline.getTopicOnline('xxxx', function(result){ + expect(result).to.be.equal(0); + topicOnline.getTopicOnline('testTopic2', function(result){ + expect(result).to.be.equal(0); + done(); + }); + }); + }); + }, 1000); + + }); + + it('Test topicOnline data timeOut', function(done){ + topicOnline.timeValidWithIn = 500; + setTimeout(function(){ + topicOnline.getTopicOnline('topic:Test1', function(result){ + expect(result).to.be.equal(0); + done(); + }); + }, 1000); + }); + +});