Coverage for src / app / secure_system.py: 100%

62 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 21:18 +0000

1""" 

2Secure System Module. 

3 

4This module demonstrates a secure implementation of a user management service 

5following strict typing and security guidelines. 

6""" 

7 

8from abc import ABC, abstractmethod 

9from uuid import UUID, uuid4 

10 

11from pydantic import EmailStr, Field, SecretStr 

12from pydantic.networks import IPvAnyAddress 

13 

14from taipanstack.core.result import Err, Ok, Result 

15from taipanstack.security import SecureBaseModel, hash_password 

16from taipanstack.utils.logging import get_logger 

17 

18# Configure logger 

19logger = get_logger(__name__) 

20 

21 

22class UserNotFoundError(Exception): 

23 """Exception raised when a user is not found.""" 

24 

25 def __init__(self, user_id: UUID) -> None: 

26 """Initialize the exception with the user ID.""" 

27 self.user_id = user_id 

28 super().__init__(f"User with ID {user_id} not found.") 

29 

30 

31class UserAlreadyExistsError(Exception): 

32 """Exception raised when a user already exists.""" 

33 

34 def __init__(self, message: str) -> None: 

35 """Initialize the exception with a message.""" 

36 self.message = message 

37 super().__init__(message) 

38 

39 

40class UserCreationError(Exception): 

41 """Exception class for user creation errors.""" 

42 

43 def __init__(self, message: str = "Failed to create user") -> None: 

44 """Initialize the exception with a message.""" 

45 self.message = message 

46 super().__init__(message) 

47 

48 

49class UserCreate(SecureBaseModel): 

50 """ 

51 Model for creating a new user. 

52 

53 Attributes: 

54 username: The username of the user. 

55 email: The email address of the user. 

56 password: The password of the user (will be treated as a secret). 

57 ip_address: The IP address from which the user is registering. 

58 

59 """ 

60 

61 username: str = Field(..., min_length=3, max_length=50, pattern=r"^[a-zA-Z0-9_]+$") 

62 email: EmailStr 

63 password: SecretStr 

64 ip_address: IPvAnyAddress | None = None 

65 

66 

67class User(SecureBaseModel): 

68 """ 

69 Model representing a registered user. 

70 

71 Attributes: 

72 id: Unique identifier for the user. 

73 username: The username of the user. 

74 email: The email address of the user. 

75 is_active: Whether the user account is active. 

76 

77 """ 

78 

79 id: UUID 

80 username: str 

81 email: EmailStr 

82 is_active: bool = True 

83 

84 

85class UserInDB(User): 

86 """ 

87 Model representing a registered user in the database. 

88 

89 Attributes: 

90 password_hash: The hashed password of the user. 

91 

92 """ 

93 

94 password_hash: str 

95 

96 

97class UserRepository(ABC): 

98 """Abstract base class for user data access.""" 

99 

100 @abstractmethod 

101 def save(self, user: UserInDB) -> None: 

102 """ 

103 Save a user to the repository. 

104 

105 Args: 

106 user: The user to save. 

107 

108 """ 

109 

110 @abstractmethod 

111 def get_by_id(self, user_id: UUID) -> UserInDB | None: 

112 """ 

113 Retrieve a user by their ID. 

114 

115 Args: 

116 user_id: The UUID of the user. 

117 

118 Returns: 

119 The UserInDB object if found, otherwise None. 

120 

121 """ 

122 

123 

124class InMemoryUserRepository(UserRepository): 

125 """In-memory implementation of UserRepository.""" 

126 

127 def __init__(self) -> None: 

128 """Initialize the in-memory repository.""" 

129 self._storage: dict[UUID, UserInDB] = {} 

130 

131 def save(self, user: UserInDB) -> None: 

132 """ 

133 Save a user to the in-memory storage. 

134 

135 Args: 

136 user: The user to save. 

137 

138 """ 

139 self._storage[user.id] = user 

140 

141 def get_by_id(self, user_id: UUID) -> UserInDB | None: 

142 """ 

143 Retrieve a user from the in-memory storage. 

144 

145 Args: 

146 user_id: The UUID of the user. 

147 

148 Returns: 

149 The UserInDB object if found, otherwise None. 

150 

151 """ 

152 return self._storage.get(user_id) 

153 

154 

155class UserService: 

156 """Service for managing users securely.""" 

157 

158 def __init__(self, user_repository: UserRepository) -> None: 

159 """ 

160 Initialize the UserService with a repository. 

161 

162 Args: 

163 user_repository: The repository to use for data access. 

164 

165 """ 

166 self._user_repository = user_repository 

167 

168 def create_user(self, user_create: UserCreate) -> Result[User, UserCreationError]: 

169 """ 

170 Create a new user. 

171 

172 Args: 

173 user_create: The user creation data. 

174 

175 Returns: 

176 Ok(User) on success, Err(UserCreationError) on failure. 

177 

178 """ 

179 # Hash the password securely using the security module 

180 pwd_hash = hash_password(user_create.password) 

181 

182 user_id = uuid4() 

183 user_in_db = UserInDB( 

184 id=user_id, 

185 username=user_create.username, 

186 email=user_create.email, 

187 password_hash=pwd_hash, 

188 ) 

189 try: 

190 self._user_repository.save(user_in_db) 

191 logger.info("User created successfully", user_id=user_in_db.id) 

192 # Return the public User model, excluding the password hash 

193 public_user = User( 

194 id=user_in_db.id, 

195 username=user_in_db.username, 

196 email=user_in_db.email, 

197 is_active=user_in_db.is_active, 

198 ) 

199 return Ok(public_user) 

200 except UserAlreadyExistsError as e: 

201 logger.warning("Failed to create user", user_id=user_in_db.id) 

202 return Err(UserCreationError(message=str(e))) 

203 

204 def get_user(self, user_id: UUID) -> Result[User, UserNotFoundError]: 

205 """ 

206 Retrieve a user by ID using Result pattern. 

207 

208 Args: 

209 user_id: The UUID of the user. 

210 

211 Returns: 

212 Ok(User) if found, Err(UserNotFoundError) if not found. 

213 

214 Example: 

215 >>> match service.get_user(some_id): 

216 ... case Ok(user): 

217 ... print(f"Found: {user.username}") 

218 ... case Err(error): 

219 ... print(f"Not found: {error.user_id}") 

220 

221 """ 

222 user_in_db = self._user_repository.get_by_id(user_id) 

223 if user_in_db is None: 

224 logger.warning("User lookup failed", user_id=user_id) 

225 return Err(UserNotFoundError(user_id)) 

226 

227 public_user = User( 

228 id=user_in_db.id, 

229 username=user_in_db.username, 

230 email=user_in_db.email, 

231 is_active=user_in_db.is_active, 

232 ) 

233 return Ok(public_user)