Docker CI: Python 测试 Redis
一、概述
二、Docker 安装运行 Redis
三、Python 安装 Redis 库
三、Python 运行 Redis 的 API
四、Python 创建 Redis 连接方式
五、Python 测试 Redis
Docker CI: Python 测试 Redis
一、概述
基于 Docker 集成 CI 环境。涉及技术:Linux(Ubuntu 14.04), Docker, Jenkins, Git/Gitlab, Web/Httpbin, Python/Pytest, UI/Selenium, Robotframework, Grid Server, Appium 等。
二、Docker 安装运行 Redis
# docker pull redis
# docker run --name redis -p 6379:6379 -d redis redis-server --appendonly yes
# docker exec -it redis /bin/bash
root@79163ac4b344:/data# redis-cli
127.0.0.1:6379> set name allan
OK
127.0.0.1:6379> get name
"allan"
三、Python 安装 Redis 库
# pip install redis
1
三、Python 运行 Redis 的 API
redis-py 的API的使用可以分类为:
连接方式
连接池
操作操作
String 操作:字符串、整数和浮点元素
Hash 操作:哈希。PHP 关联数组;Python 字典
List 操作:列表。Python 元组
Set 操作:集合。集合中的元素唯一
Sort Set 操作:有序集合。元素有分值,用于排序
管道
发布订阅
四、Python 创建 Redis 连接方式
redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数 Redis,这样就可以实现多个Redis实例共享一个连接池。
# coding=utf-8
import os
import redis
class RedisAPI(object):
def __init__(self):
self.project_dir = os.getcwd()
self.project = os.path.join(self.project_dir, 'test_data', 'project.csv')
self.ip = None
self.port = None
self.pool = None
def get_redis_info(self):
# Read Project Info from CSV file
project_info = read_csv_file(self.project)
self.ip, self.port = project_info[0]['url'], project_info[0]['port']
def connect_to_redis(self, db=0):
"""
Returns redis connection.
Arguments:
| db | Redis db Id. |
Return:
| rds | redis connection. |
Example:
| rds = connect_to_redis() |
"""
if self.ip is None or self.port is None:
self.get_redis_info()
return redis.Redis(host=self.ip, port=self.port, db=db)
def get_redis_pool(self, db=0):
"""
Returns redis from redis pool.
Arguments:
| db | Redis db Id. |
Return:
| rds | redis connection from redis pool. |
Example:
| rds = get_redis_pool() |
"""
if self.pool is None:
if self.ip is None or self.port is None:
self.get_redis_info()
self.pool = redis.ConnectionPool(host=self.ip, port=self.port, db=db)
return redis.Redis(connection_pool=self.pool)
def read_csv_file(self, filename):
"""
Returns the values from the selected csv file.
Arguments:
| filename | Get data from the selected csv file. |
Return:
| data | The data will be formed as dict. |
Example:
| data = read_csv_file(filename) |
"""
reload(sys)
sys.setdefaultencoding('gbk')
with open(filename, 'rb') as csvfile:
data = [each for each in csv.DictReader(csvfile)]
return data
csv 文档
五、Python 测试 Redis
# coding=utf-8
from RedisAPI import RedisAPI
redis_api = RedisAPI()
r = redis_api.get_redis_pool()
r.set('foo', 'Bar')
print r.get('foo')
print r.get('foo')
rds = redis_api.get_redis_pool()
rds1 = redis_api.get_redis_pool()
rds.set('name', 'allan')
rds1.set('name', 'Sheila')
print rds.get('name')
结果
127.0.0.1:6379>
127.0.0.1:6379> get foo
"Bar"
127.0.0.1:6379> get name
"Sheila"
127.0.0.1:6379>