API Reference

bin

app

A CLI tool for exporting data.

cli()

CLI group.

Source code in datainventory/bin/app.py
 9
10
11
12
@click.group()
def cli() -> None:
    """CLI group."""
    pass

destroy(device_id, inventory_path)

Destroy the inventory data.

Source code in datainventory/bin/app.py
39
40
41
42
43
44
45
46
47
@cli.command()
@click.option("--device_id")
@click.option("--inventory_path")
def destroy(device_id: str, inventory_path: str) -> None:
    """Destroy the inventory data."""
    store = inventory.Inventory(
        device_id=device_id, inventory=pathlib.Path(inventory_path)
    )
    store.destroy()

inventory_export(device_id, inventory_path, export_path)

Export the entire inventory data.

Source code in datainventory/bin/app.py
15
16
17
18
19
20
21
22
23
24
@cli.command()
@click.option("--device_id")
@click.option("--inventory_path")
@click.option("--export_path")
def inventory_export(device_id: str, inventory_path: str, export_path: str) -> None:
    """Export the entire inventory data."""
    store = inventory.Inventory(
        device_id=device_id, inventory=pathlib.Path(inventory_path)
    )
    store.export(dest_filename=pathlib.Path(export_path))

inventory_import(device_id, source_path, inventory_path)

Import data into the inventory.

Source code in datainventory/bin/app.py
27
28
29
30
31
32
33
34
35
36
@cli.command()
@click.option("--device_id")
@click.option("--source_path")
@click.option("--inventory_path")
def inventory_import(device_id: str, source_path: str, inventory_path: str) -> None:
    """Import data into the inventory."""
    store = inventory.Inventory(
        device_id=device_id, inventory=pathlib.Path(inventory_path)
    )
    store.import_data(source_data=pathlib.Path(source_path))

main()

Entry point.

Source code in datainventory/bin/app.py
50
51
52
def main() -> None:
    """Entry point."""
    cli()

common

Common code for Data Inventory.

ColumnType

Bases: Enum

Supported custom data type.

Source code in datainventory/common.py
35
36
37
38
39
40
41
42
43
class ColumnType(enum.Enum):
    """Supported custom data type."""

    Binary = sqlalchemy.LargeBinary
    Boolean = sqlalchemy.Boolean
    DateTime = sqlalchemy.DateTime
    Float = sqlalchemy.Float
    Integer = sqlalchemy.Integer
    String = sqlalchemy.String

Range

Time range for query data.

Source code in datainventory/common.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Range:
    """Time range for query data."""

    def __init__(
        self,
        start: datetime.datetime,
        end: Optional[datetime.datetime] = None,
        interval: Optional[datetime.timedelta] = None,
    ) -> None:
        self._start = start
        self._end = end
        if interval:
            self._end = self._start + interval

    def get_range(self):
        """Return the start and end timestamp."""
        return self._start, self._end

get_range()

Return the start and end timestamp.

Source code in datainventory/common.py
30
31
32
def get_range(self):
    """Return the start and end timestamp."""
    return self._start, self._end

inventory

The main module of Data Inventory.

Inventory

Data Inventory.

Source code in datainventory/inventory.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class Inventory:
    """Data Inventory."""

    def __init__(self, device_id: str, inventory: pathlib.Path) -> None:
        self._device_id = device_id
        self._inventory = inventory
        self._data_inventory = self._inventory / pathlib.Path("data")
        self._database_name = f"{self._inventory.name}.db"
        self._database = self._inventory / self._database_name

        if not self._inventory.exists():
            self._inventory.mkdir()
        self._engine = sqlalchemy.create_engine(f"sqlite:///{self._database}")
        self._metadata = sqlalchemy.MetaData()
        self._metadata.create_all(bind=self._engine)

    def get_media_store(self) -> media_store.MediaStore:
        """Return an instance of the media store."""
        Session = sessionmaker(bind=self._engine)
        return media_store.MediaStore(
            create_key=_internal_store.CREATE_KEY,
            device_id=self._device_id,
            session=Session(),
            connection=self._engine.connect(),
            data_inventory=self._data_inventory,
        )

    def get_model_store(self) -> model_store.ModelStore:
        """Return an instance of the model store."""
        Session = sessionmaker(bind=self._engine)
        return model_store.ModelStore(
            create_key=_internal_store.CREATE_KEY,
            device_id=self._device_id,
            session=Session(),
        )

    def get_table_store(self) -> table_store.TableStore:
        """Return an instance of the table store."""
        Session = sessionmaker(bind=self._engine)
        return table_store.TableStore(
            create_key=_internal_store.CREATE_KEY,
            device_id=self._device_id,
            metadata=self._metadata,
            session=Session(),
            connection=self._engine.connect(),
        )

    def export(self, dest_filename: pathlib.Path) -> pathlib.Path:
        """Export the entire data inventory."""
        self._metadata.reflect(self._engine)
        session = sessionmaker(bind=self._engine)()
        tables = list()
        for table_name in self._metadata.tables:
            table = self._metadata.tables[table_name]
            table_csv = self._inventory / pathlib.Path(f"{table}.csv")
            pd.DataFrame(
                session.query(table).all(), columns=table.columns.keys()
            ).to_csv(path_or_buf=table_csv, index=False)
            tables.append(table_csv)

        archive_path = f"{dest_filename.name}.tar.gz"
        with tarfile.open(name=archive_path, mode="w:gz") as tarball:
            tarball.add(name=self._data_inventory)
            for table_csv_file in tables:
                tarball.add(name=table_csv_file)
                table_csv_file.unlink()
        return pathlib.Path(archive_path)

    def destroy(self) -> None:
        """Destory the entire data inventory."""
        if self._inventory:
            shutil.rmtree(self._inventory)

    def import_data(self, source_data: pathlib.Path) -> None:
        """Import data into the data inventory."""
        raise NotImplementedError("This function is not implemented.")

destroy()

Destory the entire data inventory.

Source code in datainventory/inventory.py
88
89
90
91
def destroy(self) -> None:
    """Destory the entire data inventory."""
    if self._inventory:
        shutil.rmtree(self._inventory)

export(dest_filename)

Export the entire data inventory.

Source code in datainventory/inventory.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def export(self, dest_filename: pathlib.Path) -> pathlib.Path:
    """Export the entire data inventory."""
    self._metadata.reflect(self._engine)
    session = sessionmaker(bind=self._engine)()
    tables = list()
    for table_name in self._metadata.tables:
        table = self._metadata.tables[table_name]
        table_csv = self._inventory / pathlib.Path(f"{table}.csv")
        pd.DataFrame(
            session.query(table).all(), columns=table.columns.keys()
        ).to_csv(path_or_buf=table_csv, index=False)
        tables.append(table_csv)

    archive_path = f"{dest_filename.name}.tar.gz"
    with tarfile.open(name=archive_path, mode="w:gz") as tarball:
        tarball.add(name=self._data_inventory)
        for table_csv_file in tables:
            tarball.add(name=table_csv_file)
            table_csv_file.unlink()
    return pathlib.Path(archive_path)

get_media_store()

Return an instance of the media store.

Source code in datainventory/inventory.py
36
37
38
39
40
41
42
43
44
45
def get_media_store(self) -> media_store.MediaStore:
    """Return an instance of the media store."""
    Session = sessionmaker(bind=self._engine)
    return media_store.MediaStore(
        create_key=_internal_store.CREATE_KEY,
        device_id=self._device_id,
        session=Session(),
        connection=self._engine.connect(),
        data_inventory=self._data_inventory,
    )

get_model_store()

Return an instance of the model store.

Source code in datainventory/inventory.py
47
48
49
50
51
52
53
54
def get_model_store(self) -> model_store.ModelStore:
    """Return an instance of the model store."""
    Session = sessionmaker(bind=self._engine)
    return model_store.ModelStore(
        create_key=_internal_store.CREATE_KEY,
        device_id=self._device_id,
        session=Session(),
    )

get_table_store()

Return an instance of the table store.

Source code in datainventory/inventory.py
56
57
58
59
60
61
62
63
64
65
def get_table_store(self) -> table_store.TableStore:
    """Return an instance of the table store."""
    Session = sessionmaker(bind=self._engine)
    return table_store.TableStore(
        create_key=_internal_store.CREATE_KEY,
        device_id=self._device_id,
        metadata=self._metadata,
        session=Session(),
        connection=self._engine.connect(),
    )

import_data(source_data)

Import data into the data inventory.

Source code in datainventory/inventory.py
93
94
95
def import_data(self, source_data: pathlib.Path) -> None:
    """Import data into the data inventory."""
    raise NotImplementedError("This function is not implemented.")

media_store

Store for multimedia data such as video, audio, and image.

Media

Bases: Base

Table definition for multimedia.

Source code in datainventory/media_store.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class Media(common.Base):
    """Table definition for multimedia."""

    __tablename__ = "media"

    filename = sqlalchemy.Column(sqlalchemy.String)
    fullpath = sqlalchemy.Column(sqlalchemy.String, primary_key=True)
    media_type = sqlalchemy.Column(sqlalchemy.String)
    format = sqlalchemy.Column(sqlalchemy.String)
    device_id = sqlalchemy.Column(sqlalchemy.String)
    created_at = sqlalchemy.Column(sqlalchemy.DateTime)
    size = sqlalchemy.Column(sqlalchemy.Integer)
    duration = sqlalchemy.Column(sqlalchemy.Integer)

    def __repr__(self):
        """Provie nice representation for the media type."""
        return (
            f"<Media(filename={self.filename}, fullpath={self.fullpath}, "
            f"media_type={self.media_type}, format={self.format}, "
            f"device_id={self.device_id}, created_at={self.created_at}, "
            f"size={self.size}, duration={self.duration})>"
        )

__repr__()

Provie nice representation for the media type.

Source code in datainventory/media_store.py
42
43
44
45
46
47
48
49
def __repr__(self):
    """Provie nice representation for the media type."""
    return (
        f"<Media(filename={self.filename}, fullpath={self.fullpath}, "
        f"media_type={self.media_type}, format={self.format}, "
        f"device_id={self.device_id}, created_at={self.created_at}, "
        f"size={self.size}, duration={self.duration})>"
    )

MediaStore

Bases: InternalStore

Media Store.

Source code in datainventory/media_store.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class MediaStore(_internal_store.InternalStore):
    """Media Store."""

    def __init__(
        self,
        create_key,
        device_id: str,
        session: Session,
        connection: sqlalchemy.engine.Connection,
        data_inventory: pathlib.Path,
    ) -> None:
        _internal_store.InternalStore.__init__(
            self, create_key=create_key, device_id=device_id
        )
        self._session = session
        Media.__table__.create(bind=connection, checkfirst=True)
        self._data_inventory = data_inventory
        if not self._data_inventory.exists():
            self._data_inventory.mkdir(parents=True)

    def insert_media(
        self, file_path: pathlib.Path, media_type: MediaType, copy: bool = True
    ) -> pathlib.Path:
        """Insert a media."""
        if not file_path.exists():
            raise FileNotFoundError(f"{file_path} does not exist!")

        # Use copy2 to preserve the file metadata.
        shutil.copy2(src=file_path, dst=self._data_inventory)
        if not copy:
            # If not copy, perform move operation, i.e., copy and then delete
            file_path.unlink()

        dest_file = self._data_inventory / pathlib.Path(file_path.name)
        stat = dest_file.stat()
        data = Media(
            filename=dest_file.name,
            fullpath=str(dest_file),
            media_type=media_type.name,
            format=dest_file.suffix,
            device_id=self._device_id,
            created_at=datetime.fromtimestamp(stat.st_ctime, tz=timezone.utc),
            size=stat.st_size,
            duration=0,  # FIXME: get the duration info if it's an audio or a video.
        )
        self._session.add(data)
        self._session.commit()
        return dest_file

    def query_data(
        self, query_statement: Optional[sqlalchemy.sql.Select] = None
    ) -> List:
        """Retrieve the media data."""
        if query_statement:
            result = self._session.execute(query_statement)
        else:
            query_statement = sqlalchemy.select(Media)
            result = self._session.execute(query_statement)
        return result.all()  # type: ignore

insert_media(file_path, media_type, copy=True)

Insert a media.

Source code in datainventory/media_store.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def insert_media(
    self, file_path: pathlib.Path, media_type: MediaType, copy: bool = True
) -> pathlib.Path:
    """Insert a media."""
    if not file_path.exists():
        raise FileNotFoundError(f"{file_path} does not exist!")

    # Use copy2 to preserve the file metadata.
    shutil.copy2(src=file_path, dst=self._data_inventory)
    if not copy:
        # If not copy, perform move operation, i.e., copy and then delete
        file_path.unlink()

    dest_file = self._data_inventory / pathlib.Path(file_path.name)
    stat = dest_file.stat()
    data = Media(
        filename=dest_file.name,
        fullpath=str(dest_file),
        media_type=media_type.name,
        format=dest_file.suffix,
        device_id=self._device_id,
        created_at=datetime.fromtimestamp(stat.st_ctime, tz=timezone.utc),
        size=stat.st_size,
        duration=0,  # FIXME: get the duration info if it's an audio or a video.
    )
    self._session.add(data)
    self._session.commit()
    return dest_file

query_data(query_statement=None)

Retrieve the media data.

Source code in datainventory/media_store.py
101
102
103
104
105
106
107
108
109
110
def query_data(
    self, query_statement: Optional[sqlalchemy.sql.Select] = None
) -> List:
    """Retrieve the media data."""
    if query_statement:
        result = self._session.execute(query_statement)
    else:
        query_statement = sqlalchemy.select(Media)
        result = self._session.execute(query_statement)
    return result.all()  # type: ignore

MediaType

Bases: Enum

Supporte media type.

Source code in datainventory/media_store.py
20
21
22
23
24
25
class MediaType(enum.Enum):
    """Supporte media type."""

    Audio = enum.auto()
    Image = enum.auto()
    Video = enum.auto()

model_store

Store for machine learning models.

Model

Bases: Base

The table definition of learning models.

Source code in datainventory/model_store.py
15
16
17
18
19
20
21
class Model(common.Base):
    """The table definition of learning models."""

    __tablename__ = "model"

    name = sqlalchemy.Column(sqlalchemy.String, primary_key=True)
    version = sqlalchemy.Column(sqlalchemy.String)

ModelStore

Bases: InternalStore

Model Store.

Source code in datainventory/model_store.py
24
25
26
27
28
29
30
31
32
33
34
35
class ModelStore(_internal_store.InternalStore):
    """Model Store."""

    def __init__(self, create_key, device_id: str, session: Session) -> None:
        _internal_store.InternalStore.__init__(
            self, create_key=create_key, device_id=device_id
        )
        self._session = session

    def get_model(self, name: str, version: str) -> Tuple:
        """Retrieve the model according to the name and version from database."""
        raise NotImplementedError("The function is not implemented.")

get_model(name, version)

Retrieve the model according to the name and version from database.

Source code in datainventory/model_store.py
33
34
35
def get_model(self, name: str, version: str) -> Tuple:
    """Retrieve the model according to the name and version from database."""
    raise NotImplementedError("The function is not implemented.")

table_store

Store for structured data.

TableStore

Bases: InternalStore

Table Store.

Source code in datainventory/table_store.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class TableStore(_internal_store.InternalStore):
    """Table Store."""

    def __init__(
        self,
        create_key,
        device_id: str,
        metadata: sqlalchemy.MetaData,
        session: Session,
        connection: sqlalchemy.engine.Connection,
    ) -> None:
        _internal_store.InternalStore.__init__(
            self, create_key=create_key, device_id=device_id
        )
        self._session = session
        self._metadata = metadata
        self._connection = connection
        self._metadata.create_all(bind=self._connection)

    def create_table(
        self, table_name: str, columns: Dict[str, common.ColumnType]
    ) -> None:
        """Create a table."""
        columns["device_id"] = common.ColumnType.String
        columns["timestamp"] = common.ColumnType.DateTime

        table = sqlalchemy.Table(
            table_name,
            self._metadata,
            *(
                sqlalchemy.Column(column_name, column_type.value)
                for column_name, column_type in columns.items()
            ),
        )
        table.create(bind=self._connection, checkfirst=True)

    def insert(self, table_name: str, values: List[Dict]) -> None:
        """Insert data."""
        for item in values:
            item["device_id"] = self._device_id
            item["timestamp"] = datetime.datetime.utcnow()

        table = self._metadata.tables[table_name]
        self._session.execute(table.insert().values(values))
        self._session.commit()

    def query_data(  # type: ignore
        self, table_name: str, range: Optional[common.Range] = None
    ) -> pd.DataFrame:
        """Query data from a given table within a time range."""
        table: sqlalchemy.Table = self._metadata.tables[table_name]

        if range:
            start, end = range.get_range()
            if end:
                results = (
                    self._session.query(table)
                    .filter(table.c.timestamp >= start, table.c.timestamp <= end)
                    .all()
                )
            else:
                results = (
                    self._session.query(table).filter(table.c.timestamp >= start).all()
                )
        else:
            results = self._session.query(table).all()
        return pd.DataFrame(results, columns=table.columns.keys())

create_table(table_name, columns)

Create a table.

Source code in datainventory/table_store.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def create_table(
    self, table_name: str, columns: Dict[str, common.ColumnType]
) -> None:
    """Create a table."""
    columns["device_id"] = common.ColumnType.String
    columns["timestamp"] = common.ColumnType.DateTime

    table = sqlalchemy.Table(
        table_name,
        self._metadata,
        *(
            sqlalchemy.Column(column_name, column_type.value)
            for column_name, column_type in columns.items()
        ),
    )
    table.create(bind=self._connection, checkfirst=True)

insert(table_name, values)

Insert data.

Source code in datainventory/table_store.py
54
55
56
57
58
59
60
61
62
def insert(self, table_name: str, values: List[Dict]) -> None:
    """Insert data."""
    for item in values:
        item["device_id"] = self._device_id
        item["timestamp"] = datetime.datetime.utcnow()

    table = self._metadata.tables[table_name]
    self._session.execute(table.insert().values(values))
    self._session.commit()

query_data(table_name, range=None)

Query data from a given table within a time range.

Source code in datainventory/table_store.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def query_data(  # type: ignore
    self, table_name: str, range: Optional[common.Range] = None
) -> pd.DataFrame:
    """Query data from a given table within a time range."""
    table: sqlalchemy.Table = self._metadata.tables[table_name]

    if range:
        start, end = range.get_range()
        if end:
            results = (
                self._session.query(table)
                .filter(table.c.timestamp >= start, table.c.timestamp <= end)
                .all()
            )
        else:
            results = (
                self._session.query(table).filter(table.c.timestamp >= start).all()
            )
    else:
        results = self._session.query(table).all()
    return pd.DataFrame(results, columns=table.columns.keys())