-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsysviewparser.py
More file actions
276 lines (246 loc) · 11.2 KB
/
Copy pathsysviewparser.py
File metadata and controls
276 lines (246 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
import pandas as pd
import re
from graphviz import Digraph
import colorsys # For HSV -> RGB conversion
import zlib # For computing a CRC32 hash
# Regular expressions for extracting timing and resource info.
pattern_run = re.compile(r"Runs for\s+([\d\.]+)\s*us", flags=re.IGNORECASE)
pattern_after = re.compile(r"runs after\s+([\d\.]+)\s*us", flags=re.IGNORECASE)
pattern_res = re.compile(r"xQueue=([^\s]+)")
pattern_prio = re.compile(r"Priority=(\d+)", flags=re.IGNORECASE)
def compute_stats(values):
"""Compute average, min, max and spread from a list of numeric values."""
if not values:
return None, None, None, None
avg = sum(values) / len(values)
minimum = min(values)
maximum = max(values)
spread = maximum - minimum
return avg, minimum, maximum, spread
def name_to_color(name, s=0.7, v=0.95):
"""
Given a node name, compute a unique color.
- Compute a CRC32 hash of the name, then take it modulo 360 to get a hue value.
- Convert from HSV to RGB and format as a hex color string.
"""
hue = zlib.crc32(name.encode('utf-8')) % 360
r, g, b = colorsys.hsv_to_rgb(hue/360.0, s, v)
return f'#{int(r*255):02x}{int(g*255):02x}{int(b*255):02x}'
# Dictionaries to collect timing stats per task (for "Runs for")
exec_times = {} # key: task name, value: list of "Runs for" times (in us)
# We record two groups of edges:
# - normal_edges: direct call from a source task to a target task.
# - resource_edges: when "xQueue" is present, we create edges:
# (a) from the calling task to a resource node,
# (b) from that resource node to the target task.
normal_edges = [] # each entry: {src, target, event, delays (list)}
res_edges_src = [] # edge from calling task to resource: {src, resource, event}
res_edges_target = [] # edge from resource to target task: {resource, target, event, delays (list)}
# Dictionary to keep resource nodes.
resource_nodes = {}
# Some events or tasks we don't want to display.
no_display_contexts = ["Idle", "SysTick"]
no_display_events = ["ISR Exit", "Stack Info", "Log", "Task Info", "Task Create"]
if __name__ == "__main__":
# ------------------------------------------------------------------------
# 1. Load and integrate data from both cores.
# csv_files = ["expdata_core1.csv", "expdata_core2.csv"]
# csv_files = ["uwb_core1.csv", "uwb_core2.csv"]
# csv_files = ["noprint_core1.csv", "noprint_core2.csv"]
# csv_files = ["notify_core1.csv", "notify_core2.csv"]
csv_files = ["full_core1.csv", "full_core2.csv"]
df_list = []
for path in csv_files:
temp_df = pd.read_csv(path)
# Add a 'core' column based on the filename.
if "core1" in path:
temp_df['core'] = "core1"
elif "core2" in path:
temp_df['core'] = "core2"
else:
temp_df['core'] = "unknown"
df_list.append(temp_df)
# Concatenate the data from both cores.
df = pd.concat(df_list, ignore_index=True)
# ------------------------------------------------------------------------
# 2. Build a mapping of task names to the set of cores where they appear.
# We do not change the task name; we record core info separately.
task_cores = {}
for idx, row in df.iterrows():
task = row['context'] # task name remains unchanged
core = row['core']
task_cores.setdefault(task, set()).add(core)
task_priority = {}
for idx, row in df.iterrows():
context = row['context']
event = row['event']
detail = str(row['detail']) # ensure detail is a string
if context == "Idle" and event == "Task Info":
# print(detail)
m_prio = pattern_prio.search(detail)
# print(m_prio)
if m_prio:
try:
priority = int(m_prio.group(1))
src = detail.split(" (")[0]
task_priority.setdefault(src, []).append(priority)
except ValueError:
pass
print(task_priority)
# Use the unique task names as nodes.
tasks = df['context'].unique()
# ------------------------------------------------------------------------
# 3. Process every row to extract timing and edge info.
for idx, row in df.iterrows():
src = row['context'] # task name without core info
if src in no_display_contexts:
continue
event = row['event']
if event in no_display_events:
continue
detail = str(row['detail']) # ensure detail is a string
# 3a. Extract execution timing ("Runs for").
m_run = pattern_run.search(detail)
if m_run:
try:
runtime = float(m_run.group(1))
exec_times.setdefault(src, []).append(runtime)
except ValueError:
pass
# 3b. Check for "runs after ..." delay.
m_after = pattern_after.search(detail)
delay_value = None
if m_after:
try:
delay_value = float(m_after.group(1))
except ValueError:
pass
# 3c. Determine if any target tasks are mentioned in the detail.
# We scan the detail for task names from our tasks list.
target_tasks = [t for t in tasks if t != src and t in detail]
# 3d. Process resource events containing "xQueue".
if "xQueue" in detail:
m_res = pattern_res.search(detail)
if m_res:
resource_address = m_res.group(1)
resource_id = f"res_{resource_address}"
resource_nodes[resource_id] = resource_address
else:
resource_id = "res_unknown"
resource_nodes.setdefault(resource_id, "Unknown Resource")
# Create an edge from the source task to the resource.
res_edges_src.append({
"src": src,
"resource": resource_id,
"event": event
})
# And for each target task mentioned, create an edge from the resource.
for target in target_tasks:
res_edges_target.append({
"resource": resource_id,
"target": target,
"event": event,
"delays": [delay_value] if delay_value is not None else []
})
else:
# 3e. For normal events, create an edge from source task to each target task.
for target in target_tasks:
# Merge events with the same (src, target, event) by appending delays.
found = False
for edge in normal_edges:
if edge["src"] == src and edge["target"] == target and edge["event"] == event:
if delay_value is not None:
edge["delays"].append(delay_value)
found = True
break
if not found:
normal_edges.append({
"src": src,
"target": target,
"event": event,
"delays": [delay_value] if delay_value is not None else []
})
# ------------------------------------------------------------------------
# 4. Deduplicate resource edges.
# a) For source-to-resource edges.
unique_res_edges_src = {}
for r_edge in res_edges_src:
key = (r_edge["src"], r_edge["resource"], r_edge["event"])
if key not in unique_res_edges_src:
unique_res_edges_src[key] = r_edge
dedup_res_edges_src = list(unique_res_edges_src.values())
# b) For resource-to-target edges, merge delays if needed.
unique_res_edges_target = {}
for r_edge in res_edges_target:
key = (r_edge["resource"], r_edge["target"], r_edge["event"])
if key not in unique_res_edges_target:
unique_res_edges_target[key] = {
"resource": r_edge["resource"],
"target": r_edge["target"],
"event": r_edge["event"],
"delays": []
}
if r_edge["delays"]:
unique_res_edges_target[key]["delays"].extend(r_edge["delays"])
dedup_res_edges_target = list(unique_res_edges_target.values())
# ------------------------------------------------------------------------
# 5. Compute per-task execution statistics.
stats = {}
for task in tasks:
avg_run, min_run, max_run, spread_run = compute_stats(exec_times.get(task, []))
stats[task] = {
"avg_run": avg_run,
"min_run": min_run,
"max_run": max_run,
"spread_run": spread_run,
}
# ------------------------------------------------------------------------
# 6. Build the Graphviz graph.
dot = Digraph(comment='Task/Interrupt Interaction Graph', format='pdf')
# dot = Digraph(comment='Task/Interrupt Interaction Graph', format='pdf', engine='neato')
# Set graph attributes for a more relaxed layout.
# dot.attr(overlap='false', splines='true', sep='+10')
# Create a node for each task.
for task in tasks:
# Build a label that includes:
# - A prefix listing the cores where the task was observed.
# - The task name.
# - Priority (if available).
# - Execution statistics (if available).
cores = sorted(task_cores.get(task, []))
label = f"{','.join(cores)}: {task}" if cores else task
if task in task_priority:
label += f" p:{task_priority[task][0]}"
st = stats.get(task, {})
if st.get("avg_run") is not None:
label += f"\nExec: avg {st['avg_run']:.2f}us (spread {st['spread_run']:.2f}us)"
dot.node(task, label, style='filled', fillcolor=name_to_color(task))
# Create a node for each resource (displayed as a square).
for res_id, res_label in resource_nodes.items():
# A simple gray fill for resource nodes.
dot.node(res_id, res_label, shape="box", fillcolor="#787878")
# Add normal (direct) edges.
for edge in normal_edges:
label = edge["event"]
if edge["delays"]:
avg_delay, min_delay, max_delay, spread_delay = compute_stats(edge["delays"])
label += f"\nDelay: avg {avg_delay:.2f}us (spread {spread_delay:.2f}us)"
# label = ""
dot.edge(edge["src"], edge["target"], label=label)
# Add resource edges:
# a) From calling task to resource.
for r_edge in dedup_res_edges_src:
label = r_edge["event"]
if "xQueueGeneric" in label:
label = label.split("xQueueGeneric")[1]
# label = ""
dot.edge(r_edge["src"], r_edge["resource"], label=label, color=name_to_color(r_edge["src"]), fontcolor=name_to_color(r_edge["src"]))
# b) From resource to target task.
for r_edge in dedup_res_edges_target:
label = r_edge["event"]
if r_edge["delays"]:
avg_delay, min_delay, max_delay, spread_delay = compute_stats(r_edge["delays"])
label += f"\nDelay: avg {avg_delay:.2f}us (spread {spread_delay:.2f}us)"
dot.edge(r_edge["resource"], r_edge["target"], label=label)
# Render (and open, if supported) the resulting graph as a PDF.
dot.render('task_graph', view=True)