JavaEE鸿蒙应用开发HTML&JS+前端Python+大数据开发人工智能开发AI+设计软件测试新媒体+短视频直播运营产品经理集成电路应用开发(含嵌入式)Linux云计算+运维开发C/C++拍摄剪辑+短视频制作PMP项目管理认证电商运营Go语言与区块链大数据PHP工程师Android+物联网iOS.NET

【python】Docker CI: Python 测试 Redis

来源:黑马程序员

浏览21297人

2019.05.07

Docker CI: Python 测试 Redis

一、概述
二、Docker 安装运行 Redis
三、Python 安装 Redis 库
三、Python 运行 Redis 的 API
四、Python 创建 Redis 连接方式
五、Python 测试 Redis
Docker CI: Python 测试 Redis
一、概述
基于 Docker 集成 CI 环境。涉及技术:Linux(Ubuntu 14.04), Docker, Jenkins, Git/Gitlab, Web/Httpbin, Python/Pytest, UI/Selenium, Robotframework, Grid Server, Appium 等。


二、Docker 安装运行 Redis
# docker pull redis
# docker run --name redis -p 6379:6379 -d redis redis-server --appendonly yes
# docker exec -it redis /bin/bash
root@79163ac4b344:/data# redis-cli
127.0.0.1:6379> set name allan
OK
127.0.0.1:6379> get name
"allan"

三、Python 安装 Redis 库
# pip install redis
1
三、Python 运行 Redis 的 API
redis-py 的API的使用可以分类为:

连接方式
连接池
操作操作
String 操作:字符串、整数和浮点元素
Hash 操作:哈希。PHP 关联数组;Python 字典
List 操作:列表。Python 元组
Set 操作:集合。集合中的元素唯一
Sort Set 操作:有序集合。元素有分值,用于排序

管道
发布订阅
四、Python 创建 Redis 连接方式
redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数 Redis,这样就可以实现多个Redis实例共享一个连接池。

# coding=utf-8

import os
import redis


class RedisAPI(object):
    def __init__(self):
        self.project_dir = os.getcwd()
        self.project = os.path.join(self.project_dir, 'test_data', 'project.csv')
        self.ip = None
        self.port = None
        self.pool = None


    def get_redis_info(self):
        # Read Project Info from CSV file
        project_info = read_csv_file(self.project)
        self.ip, self.port = project_info[0]['url'], project_info[0]['port']


    def connect_to_redis(self, db=0):
            """
            Returns redis connection.
        
            Arguments:
                    | db | Redis db Id. |
            Return:
                    | rds | redis connection. |
            Example:
                    | rds = connect_to_redis() |
        
            """
        if self.ip is None or self.port is None:
            self.get_redis_info()
        return redis.Redis(host=self.ip, port=self.port, db=db)


    def get_redis_pool(self, db=0):
            """
            Returns redis from redis pool.
        
            Arguments:
                    | db | Redis db Id. |
            Return:
                    | rds | redis connection from redis pool. |
            Example:
                    | rds = get_redis_pool() |
        
            """
        if self.pool is None:
            if self.ip is None or self.port is None:
                self.get_redis_info()
            self.pool = redis.ConnectionPool(host=self.ip, port=self.port, db=db)
        return redis.Redis(connection_pool=self.pool)


        def read_csv_file(self, filename):
            """
            Returns the values from the selected csv file.
        
            Arguments:
                    | filename | Get data from the selected csv file. |
            Return:
                    | data | The data will be formed as dict. |
            Example:
                    | data = read_csv_file(filename) |
        
            """
            reload(sys)
            sys.setdefaultencoding('gbk')
            
            with open(filename, 'rb') as csvfile:
                data = [each for each in csv.DictReader(csvfile)]
                return data

csv 文档


五、Python 测试 Redis
# coding=utf-8

from RedisAPI import RedisAPI

redis_api = RedisAPI()

r = redis_api.get_redis_pool()
r.set('foo', 'Bar')
print r.get('foo')
print r.get('foo')

rds = redis_api.get_redis_pool()
rds1 = redis_api.get_redis_pool()
rds.set('name', 'allan')
rds1.set('name', 'Sheila')
print rds.get('name')

结果

127.0.0.1:6379>
127.0.0.1:6379> get foo
"Bar"
127.0.0.1:6379> get name
"Sheila"
127.0.0.1:6379>