taosdata/TDengine

tdengine 使用 python SQLAlchemy 连接池 是否一定需要安装客户端才能使用呢?

Open

#32,863 opened on Sep 4, 2025

View on GitHub
 (6 comments) (0 reactions) (1 assignee)C (24,849 stars) (5,002 forks)batch import
help wantedquestion

Description

我发现仅仅在虚拟环境中执行:pip3 install taospy[ws] 同时安装 taospy 和 taos-ws-py

并不能运行官方提供的代码,这个是为什么?

我必须要安装客户端吗?

import taosws
from sqlalchemy import create_engine
from sqlalchemy import text
import threading

# Create a SQLAlchemy engine with WebSocket connection
# If using native connection, use `taos` instead of `taosws`
engine = create_engine(url="taosws://root:taosdata@?hosts=localhost:6041,127.0.0.1:6041&timezone=Asia/Shanghai", pool_size=10, max_overflow=20)

def init_db():
    try:
        with engine.begin() as conn:
            # create database
            conn.execute(text("DROP DATABASE IF EXISTS power"))
            conn.execute(text("CREATE DATABASE IF NOT EXISTS power"))
            print(f"Create database power successfully")

            # create super table
            conn.execute(
                text("CREATE TABLE IF NOT EXISTS power.meters (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(64))")
            )
            print(f"Create stable power.meters successfully")

    except Exception as err:
        print(f"Failed to create db and table; ErrMessage:{err}")
        raise

def ws_insert_sql(i: int):
    try:
        with engine.begin() as conn:
            sql = text(f"""
                INSERT INTO 
                power.d1001 USING power.meters (groupid, location) TAGS(2, 'California.SanFrancisco')
                    VALUES (NOW + {i+1}a, 10.30000, 219, 0.31000) 
                    (NOW + {i+2}a, 12.60000, 218, 0.33000) (NOW + {i+3}a, 12.30000, 221, 0.31000)
                power.d1002 USING power.meters (groupid, location)  TAGS(3, 'California.SanFrancisco') 
                    VALUES (NOW + {i+1}a, 10.30000, 218, 0.25000)
                """)
            affectedRows = conn.execute(sql)
            print(f"Successfully inserted {affectedRows.cursor.row_count} rows to power.meters.")

    except Exception as err:
        print(f"Failed to insert data to power.meters; ErrMessage:{err}")
        raise

# Use connection pool to execute queries
def ws_query(sql: str):
    try:
        # Get connection from pool
        with engine.begin() as conn:
            # Execute SQL
            result = conn.execute(text(sql))
            # Get results
            data = result.fetchall()
            print(f"Query result: {data}")
            return data
    except Exception as e:
        print(f"TDengine query failed: {e}")
        raise

if __name__ == "__main__":
    init_db()  # Initialize database and tables
    threads = []
    for i in range(5):
        t1 = threading.Thread(target=ws_insert_sql, args=(i*10,))
        t2 = threading.Thread(target=ws_query, args=("SELECT * FROM power.meters",))
        threads.extend([t1, t2])
        t1.start()
        t2.start()

    for t in threads:
        t.join()

    data = ws_query("SELECT count(*) FROM power.meters")
    assert data[0][0] == 20, "Expected 20 rows in power.meters"
    print("All sub-threads completed, main thread ending")

Contributor guide