GrafanaDatastoreServer.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. #!/usr/bin/env python
  2. import argparse
  3. import redis
  4. import flask
  5. import pytz
  6. from datetime import timedelta, datetime
  7. import dateutil.parser
  8. from gevent.pywsgi import WSGIServer
  9. from flask import Flask, jsonify
  10. from flask_cors import CORS, cross_origin
  11. app = Flask(__name__)
  12. CORS(app)
  13. timezone = pytz.timezone("UTC")
  14. EPOCH = timezone.localize(datetime(1970, 1, 1, 0, 0, 0))
  15. REDIS_POOL = None
  16. SCAN_TYPE_SCRIPT = """local cursor, pat, typ, cnt = ARGV[1], ARGV[2], ARGV[3], ARGV[4] or 100
  17. local rep = {}
  18. local res = redis.call('SCAN', cursor, 'MATCH', pat, 'COUNT', cnt)
  19. while #res[2] > 0 do
  20. local k = table.remove(res[2])
  21. local t = redis.call('TYPE', k)
  22. if t['ok'] == typ then
  23. table.insert(rep, k)
  24. end
  25. end
  26. rep = {tonumber(res[1]), rep}
  27. return rep"""
  28. @app.route('/')
  29. @cross_origin()
  30. def hello_world():
  31. return 'OK'
  32. @app.route('/search', methods=["POST", 'GET'])
  33. @cross_origin()
  34. def search():
  35. redis_client = redis.Redis(connection_pool=REDIS_POOL)
  36. result = []
  37. cursor = 0
  38. while True:
  39. cursor, keys = redis_client.eval(SCAN_TYPE_SCRIPT, 0, cursor, "*", "TSDB-TYPE", 100)
  40. result.extend([k.decode("ascii") for k in keys])
  41. if cursor == 0:
  42. break
  43. return jsonify(result)
  44. def process_targets(targets, redis_client):
  45. result = []
  46. for target in targets:
  47. if '*' in target:
  48. result.extend([k.decode('ascii') for k in redis_client.keys(target)])
  49. else:
  50. result.append(target)
  51. return result
  52. @app.route('/query', methods=["POST", 'GET'])
  53. def query():
  54. request = flask.request.get_json()
  55. response = []
  56. # !!! dates 'from' and 'to' are expected to be in UTC, which is what Grafana provides here.
  57. # If not in UTC, use pytz to set to UTC timezone and subtract the utcoffset().
  58. # Time delta calculations should always be done in UTC to avoid pitfalls of daylight offset changes.
  59. stime = (dateutil.parser.parse(request['range']['from']) - EPOCH).total_seconds() * 1000
  60. etime = (dateutil.parser.parse(request['range']['to']) - EPOCH).total_seconds() * 1000
  61. redis_client = redis.Redis(connection_pool=REDIS_POOL)
  62. targets = process_targets([t['target'] for t in request['targets']], redis_client)
  63. for target in targets:
  64. args = ['ts.range', target, int(stime), int(etime)]
  65. if 'intervalMs' in request and request['intervalMs'] > 0:
  66. args += ['avg', int(request['intervalMs'])]
  67. print(args)
  68. redis_resp = redis_client.execute_command(*args)
  69. datapoints = [(float(x2.decode("ascii")), x1) for x1, x2 in redis_resp]
  70. response.append(dict(target=target, datapoints=datapoints))
  71. return jsonify(response)
  72. @app.route('/annotations')
  73. def annotations():
  74. return jsonify([])
  75. def main():
  76. global REDIS_POOL
  77. parser = argparse.ArgumentParser()
  78. parser.add_argument("--host", help="server address to listen to", default="0.0.0.0")
  79. parser.add_argument("--port", help="port number to listen to", default=8080, type=int)
  80. parser.add_argument("--redis-server", help="redis server address", default="localhost")
  81. parser.add_argument("--redis-port", help="redis server port", default=6379, type=int)
  82. args = parser.parse_args()
  83. REDIS_POOL = redis.ConnectionPool(host=args.redis_server, port=args.redis_port)
  84. http_server = WSGIServer(('', args.port), app)
  85. http_server.serve_forever()
  86. if __name__ == '__main__':
  87. main()