123456789101112131415161718192021222324252627282930313233343536373839404142 |
- #!/usr/bin/env python
- from redistimeseries.client import Client as RedisTimeSeries
- import redis
- import time
- redis_host = "redis"
- redis_port = 6379
- rts = RedisTimeSeries(host=redis_host, port=redis_port)
- pool = redis.ConnectionPool(host=redis_host, port=redis_port)
- r = redis.Redis(connection_pool=pool)
- try:
- r.xadd("mystream", {'event_type': 'startup', 'user': 'root'})
- r.xgroup_create("mystream", "consumerGroup", '$')
- except:
- print("group already exists")
- while True:
- # Retry
- try:
- msgs = r.xreadgroup("consumerGroup", "consumerName", streams={"mystream": '>'}, count=10, block=1000, noack=False)
- except:
- print("Could not create readgroup")
- time.sleep(5)
- continue
- for msg in msgs:
- for m in msg[1]:
- evnt = m[1]['event_type']
- try:
- rts.info(evnt)
- except:
- rts.create(evnt, retention_msecs=60000, labels={'event_type': evnt})
- rts.create(evnt+"_minute", retention_msecs=0, labels={'event_type': evnt})
- rts.createrule(evnt, evnt+"_minute", 'count', 60)
- rts.incrby(evnt,1)
|