from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
# Set up connection details
CASSANDRA_HOSTS = ['127.0.0.1'] # replace with your Cassandra host IP(s)
KEYSPACE = 'my_keyspace' # replace with your keyspace
USERNAME = 'cassandra' # default username
PASSWORD = 'cassandra' # default password
def connect_to_cassandra():
try:
# Authentication (optional depending on your setup)
auth_provider = PlainTextAuthProvider(USERNAME, PASSWORD)
# Create cluster and session
cluster = Cluster(CASSANDRA_HOSTS, auth_provider=auth_provider)
session = cluster.connect()
# Set the keyspace
session.set_keyspace(KEYSPACE)
print(f"✅ Connected to Cassandra keyspace: {KEYSPACE}")
return session
except Exception as e:
print(f"❌ Failed to connect to Cassandra: {e}")
return None
# Example usage
if __name__ == "__main__":
session = connect_to_cassandra()
if session:
# Run a simple query (e.g., list tables)
rows = session.execute("SELECT table_name FROM system_schema.tables WHERE keyspace_name = %s", [KEYSPACE])
print("📄 Tables in keyspace:")
for row in rows:
print(f"- {row.table_name}")
rows2 = session.execute("select * from "+row.table_name)
for row in rows2:
print(row)
hi
hi