Skip to content

SQLite

SQLite DB is the database used by the agents and tools. It is used to store the conversations and context messages.

Initialize SQLite

Create a new SQLite database and tables.

make init-sqlite-db

SQLite Interface

director.db.sqlite.db.SQLiteDB

SQLiteDB(db_path='director.db')

Bases: BaseDB

Parameters:

Name Type Description Default
db_path str

Path to the SQLite database file.

'director.db'
Source code in backend/director/db/sqlite/db.py
def __init__(self, db_path: str = "director.db"):
    """
    :param db_path: Path to the SQLite database file.
    """
    self.db_type = DBType.SQLITE
    self.db_path = db_path
    self.conn = sqlite3.connect(self.db_path, check_same_thread=True)
    self.conn.row_factory = sqlite3.Row
    self.cursor = self.conn.cursor()
    logger.info("Connected to SQLite DB...")

create_session

create_session(
    session_id,
    video_id,
    collection_id,
    created_at=None,
    updated_at=None,
    metadata={},
    **kwargs
)

Create a new session.

Parameters:

Name Type Description Default
session_id str

Unique session ID.

required
video_id str

ID of the video associated with the session.

required
collection_id str

ID of the collection associated with the session.

required
created_at int

Timestamp when the session was created.

None
updated_at int

Timestamp when the session was last updated.

None
metadata dict

Additional metadata for the session.

{}
Source code in backend/director/db/sqlite/db.py
def create_session(
    self,
    session_id: str,
    video_id: str,
    collection_id: str,
    created_at: int = None,
    updated_at: int = None,
    metadata: dict = {},
    **kwargs,
) -> None:
    """Create a new session.

    :param session_id: Unique session ID.
    :param video_id: ID of the video associated with the session.
    :param collection_id: ID of the collection associated with the session.
    :param created_at: Timestamp when the session was created.
    :param updated_at: Timestamp when the session was last updated.
    :param metadata: Additional metadata for the session.
    """
    created_at = created_at or int(time.time())
    updated_at = updated_at or int(time.time())

    self.cursor.execute(
        """
    INSERT OR IGNORE INTO sessions (session_id, video_id, collection_id, created_at, updated_at, metadata)
    VALUES (?, ?, ?, ?, ?, ?)
    """,
        (
            session_id,
            video_id,
            collection_id,
            created_at,
            updated_at,
            json.dumps(metadata),
        ),
    )
    self.conn.commit()

get_session

get_session(session_id)

Get a session by session_id.

Parameters:

Name Type Description Default
session_id str

Unique session ID.

required

Returns:

Type Description
dict

Session data as a dictionary.

Source code in backend/director/db/sqlite/db.py
def get_session(self, session_id: str) -> dict:
    """Get a session by session_id.

    :param session_id: Unique session ID.
    :return: Session data as a dictionary.
    :rtype: dict
    """
    self.cursor.execute(
        "SELECT * FROM sessions WHERE session_id = ?", (session_id,)
    )
    row = self.cursor.fetchone()
    if row is not None:
        session = dict(row)  # Convert sqlite3.Row to dictionary
        session["metadata"] = json.loads(session["metadata"])
        return session

    else:
        return {}  # Return an empty dictionary if no data found

get_sessions

get_sessions()

Get all sessions.

Returns:

Type Description
list

List of all sessions.

Source code in backend/director/db/sqlite/db.py
def get_sessions(self) -> list:
    """Get all sessions.

    :return: List of all sessions.
    :rtype: list
    """
    self.cursor.execute("SELECT * FROM sessions ORDER BY updated_at DESC")
    row = self.cursor.fetchall()
    sessions = [dict(r) for r in row]
    for s in sessions:
        s["metadata"] = json.loads(s["metadata"])
    return sessions

add_or_update_msg_to_conv

add_or_update_msg_to_conv(
    session_id,
    conv_id,
    msg_id,
    msg_type,
    agents,
    actions,
    content,
    status=None,
    created_at=None,
    updated_at=None,
    metadata={},
    **kwargs
)

Add a new message (input or output) to the conversation.

Parameters:

Name Type Description Default
session_id str

Unique session ID.

required
conv_id str

Unique conversation ID.

required
msg_id str

Unique message ID.

required
msg_type str

Type of message (input or output).

required
agents list

List of agents involved in the conversation.

required
actions list

List of actions taken by the agents.

required
content list

List of message content.

required
status str

Status of the message.

None
created_at int

Timestamp when the message was created.

None
updated_at int

Timestamp when the message was last updated.

None
metadata dict

Additional metadata for the message.

{}
Source code in backend/director/db/sqlite/db.py
def add_or_update_msg_to_conv(
    self,
    session_id: str,
    conv_id: str,
    msg_id: str,
    msg_type: str,
    agents: List[str],
    actions: List[str],
    content: List[dict],
    status: str = None,
    created_at: int = None,
    updated_at: int = None,
    metadata: dict = {},
    **kwargs,
) -> None:
    """Add a new message (input or output) to the conversation.

    :param str session_id: Unique session ID.
    :param str conv_id: Unique conversation ID.
    :param str msg_id: Unique message ID.
    :param str msg_type: Type of message (input or output).
    :param list agents: List of agents involved in the conversation.
    :param list actions: List of actions taken by the agents.
    :param list content: List of message content.
    :param str status: Status of the message.
    :param int created_at: Timestamp when the message was created.
    :param int updated_at: Timestamp when the message was last updated.
    :param dict metadata: Additional metadata for the message.
    """
    created_at = created_at or int(time.time())
    updated_at = updated_at or int(time.time())

    self.cursor.execute(
        """
    INSERT OR REPLACE INTO conversations (session_id, conv_id, msg_id, msg_type, agents, actions, content, status, created_at, updated_at, metadata)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """,
        (
            session_id,
            conv_id,
            msg_id,
            msg_type,
            json.dumps(agents),
            json.dumps(actions),
            json.dumps(content),
            status,
            created_at,
            updated_at,
            json.dumps(metadata),
        ),
    )
    self.conn.commit()

get_context_messages

get_context_messages(session_id)

Get context messages for a session.

Parameters:

Name Type Description Default
session_id str

Unique session ID.

required

Returns:

Type Description
list

List of context messages.

Source code in backend/director/db/sqlite/db.py
def get_context_messages(self, session_id: str) -> list:
    """Get context messages for a session.

    :param str session_id: Unique session ID.
    :return: List of context messages.
    :rtype: list
    """
    self.cursor.execute(
        "SELECT context_data FROM context_messages WHERE session_id = ?",
        (session_id,),
    )
    result = self.cursor.fetchone()
    return json.loads(result[0]) if result else {}

add_or_update_context_msg

add_or_update_context_msg(
    session_id,
    context_messages,
    created_at=None,
    updated_at=None,
    metadata={},
    **kwargs
)

Update context messages for a session.

Parameters:

Name Type Description Default
session_id str

Unique session ID.

required
context_messages List

List of context messages.

required
created_at int

Timestamp when the context messages were created.

None
updated_at int

Timestamp when the context messages were last updated.

None
metadata dict

Additional metadata for the context messages.

{}
Source code in backend/director/db/sqlite/db.py
def add_or_update_context_msg(
    self,
    session_id: str,
    context_messages: list,
    created_at: int = None,
    updated_at: int = None,
    metadata: dict = {},
    **kwargs,
) -> None:
    """Update context messages for a session.

    :param str session_id: Unique session ID.
    :param List context_messages: List of context messages.
    :param int created_at: Timestamp when the context messages were created.
    :param int updated_at: Timestamp when the context messages were last updated.
    :param dict metadata: Additional metadata for the context messages.
    """
    created_at = created_at or int(time.time())
    updated_at = updated_at or int(time.time())

    self.cursor.execute(
        """
    INSERT OR REPLACE INTO context_messages (context_data, session_id, created_at, updated_at, metadata)
    VALUES (?, ?, ?, ?, ?)
    """,
        (
            json.dumps(context_messages),
            session_id,
            created_at,
            updated_at,
            json.dumps(metadata),
        ),
    )
    self.conn.commit()

delete_conversation

delete_conversation(session_id)

Delete all conversations for a given session.

Parameters:

Name Type Description Default
session_id str

Unique session ID.

required

Returns:

Type Description
bool

True if conversations were deleted, False otherwise.

Source code in backend/director/db/sqlite/db.py
def delete_conversation(self, session_id: str) -> bool:
    """Delete all conversations for a given session.

    :param str session_id: Unique session ID.
    :return: True if conversations were deleted, False otherwise.
    """
    self.cursor.execute(
        "DELETE FROM conversations WHERE session_id = ?", (session_id,)
    )
    self.conn.commit()
    return self.cursor.rowcount > 0

delete_context

delete_context(session_id)

Delete context messages for a given session.

Parameters:

Name Type Description Default
session_id str

Unique session ID.

required

Returns:

Type Description
bool

True if context messages were deleted, False otherwise.

Source code in backend/director/db/sqlite/db.py
def delete_context(self, session_id: str) -> bool:
    """Delete context messages for a given session.

    :param str session_id: Unique session ID.
    :return: True if context messages were deleted, False otherwise.
    """
    self.cursor.execute(
        "DELETE FROM context_messages WHERE session_id = ?", (session_id,)
    )
    self.conn.commit()
    return self.cursor.rowcount > 0

delete_session

delete_session(session_id)

Delete a session and all its associated data.

Parameters:

Name Type Description Default
session_id str

Unique session ID.

required

Returns:

Type Description
bool

True if the session was deleted, False otherwise.

Source code in backend/director/db/sqlite/db.py
def delete_session(self, session_id: str) -> bool:
    """Delete a session and all its associated data.

    :param str session_id: Unique session ID.
    :return: True if the session was deleted, False otherwise.
    """
    failed_components = []
    if not self.delete_conversation(session_id):
        failed_components.append("conversation")
    if not self.delete_context(session_id):
        failed_components.append("context")
    self.cursor.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
    self.conn.commit()
    if not self.cursor.rowcount > 0:
        failed_components.append("session")
    success = len(failed_components) < 3
    return success, failed_components

health_check

health_check()

Check if the SQLite database is healthy and the necessary tables exist. If not, create them.

Source code in backend/director/db/sqlite/db.py
def health_check(self) -> bool:
    """Check if the SQLite database is healthy and the necessary tables exist. If not, create them."""
    try:
        query = """
            SELECT COUNT(name)
            FROM sqlite_master
            WHERE type='table'
            AND name IN ('sessions', 'conversations', 'context_messages');
        """
        self.cursor.execute(query)
        table_count = self.cursor.fetchone()[0]
        if table_count < 3:
            logger.info("Tables not found. Initializing SQLite DB...")
            initialize_sqlite(self.db_path)
        return True

    except Exception as e:
        logger.exception(f"SQLite health check failed: {e}")
        return False