runner.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. #!/usr/bin/env python
  2. from redistimeseries.client import Client as RedisTimeSeries
  3. import redis
  4. import time
  5. redis_host = "redis"
  6. redis_port = 6379
  7. rts = RedisTimeSeries(host=redis_host, port=redis_port)
  8. pool = redis.ConnectionPool(host=redis_host, port=redis_port)
  9. r = redis.Redis(connection_pool=pool)
  10. try:
  11. r.xadd("mystream", {'event_type': 'startup', 'user': 'root'})
  12. r.xgroup_create("mystream", "consumerGroup", '$')
  13. except:
  14. print("group already exists")
  15. while True:
  16. # Retry
  17. try:
  18. msgs = r.xreadgroup("consumerGroup", "consumerName", streams={"mystream": '>'}, count=10, block=1000, noack=False)
  19. except:
  20. print("Could not create readgroup")
  21. time.sleep(5)
  22. continue
  23. for msg in msgs:
  24. for m in msg[1]:
  25. evnt = m[1]['event_type']
  26. try:
  27. rts.info(evnt)
  28. except:
  29. rts.create(evnt, retention_msecs=60000, labels={'event_type': evnt})
  30. rts.create(evnt+"_minute", retention_msecs=0, labels={'event_type': evnt})
  31. rts.createrule(evnt, evnt+"_minute", 'count', 60)
  32. rts.incrby(evnt,1)