第一步:了解神经网络的基本概念
:smile:矩阵点乘规则:当矩阵A的列数(column)等于矩阵B的行数(row)时,A与B可以相乘
2行3列 * 3行2列 = 2行2列 (取第一个矩阵的行和第二个矩阵的列)
3行2列 * 2行3列 = 3行3列 (取第一个矩阵的行和第二个矩阵的列)
神经网络是一种模拟生物神经网络的计算模型,用于处理复杂的模式识别问题。它们由一系列神经元 (或称节点)组成,这些神经元通过权重 相连,权重是神经网络中最重要的参数之一。每个神经元接收输入信号,并应用一个激活函数 来决定输出信号。
关键术语:
神经元(Neuron) :基本的处理单元,接收输入并生成输出。
权重(Weight) :连接神经元之间的参数,决定信号的强度。
偏置(Bias) :一个神经元输出中独立于输入的项,帮助调整输出。
激活函数(Activation
Function) :用于非线性转换神经元的输出,例如Sigmoid, Tanh,
ReLU。
一个简单的神经网络结构:
1 输入层(Input Layer) → 隐藏层(Hidden Layer) → 输出层(Output Layer)
输入层 :接收输入数据。
隐藏层 :中间层,可以有多层,用于提取特征。
输出层 :输出最终的结果。
练习:
请你简单描述神经网络是什么。
列出并解释神经网络的三个主要组件。
神经网络是一种模拟生物神经网络的计算模型,用于处理复杂的模式识别问题;三个主要组件是输入层、隐藏层、输出层
第二步:理解一个简单神经元的工作原理
一个神经元接收输入、计算加权和并应用激活函数来生成输出。让我们更详细地看一下一个简单神经元的工作过程:
神经元的数学表达:
假设我们有一个输入 ( x ),它经过权重 ( w ) 和偏置 ( b )
调整,然后通过激活函数 ( f ) 来生成输出 ( y ):
$ y = f(w x + b) $
其中: - ( x ) 是输入。 - ( w ) 是权重。 - ( b ) 是偏置。 - ( f )
是激活函数(例如:Sigmoid、ReLU 等)。
常见激活函数:
Sigmoid 函数 :将输入映射到 (0) 和 (1) 之间。 $
(x) = $
ReLU 函数 :将负数部分映射为 (0),正数部分不变。
$ (x) = (0, x) $
练习:
假设你有一个输入 ( x = 2 ),权重 ( w = 0.5 ),偏置 ( b = 1
),并且使用 Sigmoid 激活函数。计算输出 ( y )。
为什么激活函数是必要的?
输入 x=2,权重 w=0.5,偏置 b=1,计算输出 y。
计算步骤如下:
首先,计算加权和及偏置: \(w⋅x+b=0.5⋅2+1=1+1=2\)
然后,计算 Sigmoid 函数的值: \(\sigma(2)
= \frac{1}{1 + e^{-2}} \approx 0.88\)
因此,输出$ y≈0.88$
为什么激活函数是必要的?
激活函数引入了非线性,使得神经网络能够学习和表示复杂的模式。如果没有激活函数,网络只会进行线性变换(输入的加权求和),这无法有效地处理复杂的模式或决策问题。
第三步:构建一个简单的神经元
接下来,我们将把刚才学到的知识应用于构建一个简单的神经元,并用 Python
代码来实现它。
构建步骤:
定义神经元的结构 :包括输入、权重、偏置和激活函数。
实现加权和及偏置 :计算 \(( w \cdot x + b )\) 。
应用激活函数 :使用 Sigmoid 函数来计算输出。
Python 代码实现
以下是一个简单神经元的 Python 实现,包括输入、权重、偏置和 Sigmoid
激活函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import mathdef sigmoid (x ): return 1 / (1 + math.exp(-x)) x = 2 w = 0.5 b = 1 z = w * x + b y = sigmoid(z)print (f"输出 y = {y} " )
代码解释:
定义了一个 sigmoid
函数来计算 Sigmoid
激活函数的值。
设置了输入 ( x )、权重 ( w ) 和偏置 ( b )。
计算了加权和及偏置 ( z )。
使用 sigmoid
函数计算输出 ( y )。
打印输出结果。
练习:
请在你的 Python 环境中运行上述代码,观察输出。
尝试修改输入 ( x )、权重 ( w ) 和偏置 ( b )
的值,观察输出是如何变化的。
第四步:构建一个简单的神经网络
我们将构建一个包含输入层、一个隐藏层和输出层的简单神经网络。这个神经网络将接受两个输入,经过一个隐藏层处理后,生成一个输出。
神经网络结构:
具体步骤:
初始化输入、权重和偏置 。
计算隐藏层的输出 。
计算输出层的输出 。
Python 代码实现
以下代码展示了如何构建和运行这个简单的神经网络:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import numpy as npdef sigmoid (x ): return 1 / (1 + np.exp(-x)) inputs = np.array([1 , 2 ]) hidden_weights = np.array([[0.5 , -0.6 ], [0.1 , 0.8 ]]) hidden_bias = np.array([0.2 , 0.3 ]) output_weights = np.array([0.4 , -0.7 ]) output_bias = 0.1 hidden_input = np.dot(hidden_weights, inputs) + hidden_bias hidden_output = sigmoid(hidden_input) output_input = np.dot(output_weights, hidden_output) + output_bias output = sigmoid(output_input)print (f"隐藏层输出: {hidden_output} " )print (f"最终输出: {output} " )
代码解释:
定义 Sigmoid 激活函数 :使用 numpy
的
exp
函数处理数组。
初始化输入 :inputs
是一个包含 2
个输入的数组。
初始化隐藏层的权重和偏置 :hidden_weights
是 2x2 的权重矩阵,hidden_bias
是包含 2 个偏置的数组。
初始化输出层的权重和偏置 :output_weights
是 1x2 的权重向量,output_bias
是一个单一的偏置值。
计算隐藏层的输出 :使用 np.dot
计算权重和输入的点积,加上偏置,再通过 sigmoid
函数处理。
计算输出层的输出 :类似地,使用隐藏层的输出计算输出层的最终输出。
练习:
请在你的 Python 环境中运行上述代码,并观察输出结果。
尝试修改输入、隐藏层和输出层的权重和偏置,观察输出是如何变化的。
第五步:理解神经网络的训练过程
神经网络的训练主要包括两个阶段:前向传播(Forward
Propagation) 和
反向传播(Backpropagation) 。我们来逐步理解这两个过程。
1. 前向传播(Forward
Propagation)
前向传播是将输入数据通过神经网络,计算出预测的输出。
步骤 :
输入数据通过输入层传递到隐藏层。
隐藏层的输出传递到输出层。
输出层生成最终的预测输出。
2. 损失函数(Loss Function)
损失函数用于衡量预测输出与真实值之间的差距。一个常见的损失函数是均方误差(Mean
Squared Error, MSE) :
$ = _{i=1}^N ( - y_i)^2 $
其中: - $ N $是样本数量。 - $ $是预测值。 - \(y_i\) 是真实值。
3. 反向传播(Backpropagation)
反向传播用于调整神经网络中的权重和偏置,以最小化损失函数的值。它基于梯度下降算法,通过计算损失函数相对于每个权重和偏置的偏导数来更新它们。
步骤 : 1. 计算损失函数的梯度。 2.
反向传播这些梯度,通过链式法则调整每一层的权重和偏置。
Python 代码实现(简化版)
我们将实现一个简单的神经网络训练过程,使用一个输入样本进行训练。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 import numpy as npdef sigmoid (x ): return 1 / (1 + np.exp(-x))def sigmoid_derivative (x ): return x * (1 - x) inputs = np.array([1 , 2 ]) true_output = np.array([0.8 ]) hidden_weights = np.array([[0.5 , -0.6 ], [0.1 , 0.8 ]]) hidden_bias = np.array([0.2 , 0.3 ]) output_weights = np.array([[0.4 , -0.7 ]]) output_bias = 0.1 learning_rate = 0.5 hidden_input = np.dot(hidden_weights, inputs) + hidden_bias hidden_output = sigmoid(hidden_input) output_input = np.dot(output_weights, hidden_output) + output_bias predicted_output = sigmoid(output_input) loss = np.mean((true_output - predicted_output) ** 2 ) output_error = true_output - predicted_output output_delta = output_error * sigmoid_derivative(predicted_output) hidden_error = output_delta.dot(output_weights) hidden_delta = hidden_error * sigmoid_derivative(hidden_output) output_weights += learning_rate * output_delta * hidden_output output_bias += learning_rate * output_delta hidden_weights += learning_rate * np.outer(hidden_delta, inputs) hidden_bias += learning_rate * hidden_deltaprint (f"预测输出: {predicted_output} " )print (f"损失: {loss} " )print (f"更新后的权重和偏置: {output_weights} , {output_bias} , {hidden_weights} , {hidden_bias} " )
代码解释:
定义激活函数及其导数 :sigmoid
及
sigmoid_derivative
。
初始化输入和真实输出 :inputs
和
true_output
。
初始化权重和偏置 :hidden_weights
、hidden_bias
、output_weights
和 output_bias
。
设置学习率 。
前向传播 :计算隐藏层和输出层的输出。
计算损失 :使用均方误差。
反向传播 :计算输出层和隐藏层的误差及更新量。
更新权重和偏置 。
练习:
在你的 Python 环境中运行上述代码,观察输出和权重更新。
尝试修改输入、真实输出、权重和偏置的初始值,观察训练过程中的变化。
第六步:扩展神经网络
我们将扩展神经网络,增加更多的隐藏层和神经元,以处理更复杂的数据。然后,我们会使用批量梯度下降和其他优化技巧。
批量梯度下降(Batch Gradient
Descent)
批量梯度下降是训练神经网络的一个重要方法,它在整个训练数据集上计算损失并更新权重。这种方法有助于稳定训练过程。
扩展网络的步骤:
增加更多的隐藏层 :可以通过增加更多的隐藏层和神经元来提高网络的表现力。
引入更多激活函数 :比如
ReLU,可以处理更复杂的非线性关系。
使用批量梯度下降 :在整个训练集上计算损失并更新权重。
Python 代码实现(扩展版)
以下代码展示了一个包含更多隐藏层的神经网络,并使用批量梯度下降来训练:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 import numpy as npdef sigmoid (x ): return 1 / (1 + np.exp(-x))def sigmoid_derivative (x ): return x * (1 - x) inputs = np.array([[1 , 2 ], [0.5 , 0.3 ], [0.2 , 0.8 ]]) true_outputs = np.array([[0.8 ], [0.4 ], [0.6 ]]) input_size = 2 hidden_size1 = 3 hidden_size2 = 2 output_size = 1 hidden_weights1 = np.random.rand(hidden_size1, input_size) hidden_bias1 = np.random.rand(hidden_size1) hidden_weights2 = np.random.rand(hidden_size2, hidden_size1) hidden_bias2 = np.random.rand(hidden_size2) output_weights = np.random.rand(output_size, hidden_size2) output_bias = np.random.rand(output_size) learning_rate = 0.5 epochs = 10000 for epoch in range (epochs): hidden_input1 = np.dot(inputs, hidden_weights1.T) + hidden_bias1 hidden_output1 = sigmoid(hidden_input1) hidden_input2 = np.dot(hidden_output1, hidden_weights2.T) + hidden_bias2 hidden_output2 = sigmoid(hidden_input2) output_input = np.dot(hidden_output2, output_weights.T) + output_bias predicted_output = sigmoid(output_input) loss = np.mean((true_outputs - predicted_output) ** 2 ) output_error = true_outputs - predicted_output output_delta = output_error * sigmoid_derivative(predicted_output) hidden_error2 = np.dot(output_delta, output_weights) hidden_delta2 = hidden_error2 * sigmoid_derivative(hidden_output2) hidden_error1 = np.dot(hidden_delta2, hidden_weights2) hidden_delta1 = hidden_error1 * sigmoid_derivative(hidden_output1) output_weights += learning_rate * np.dot(output_delta.T, hidden_output2) output_bias += learning_rate * np.mean(output_delta, axis=0 ) hidden_weights2 += learning_rate * np.dot(hidden_delta2.T, hidden_output1) hidden_bias2 += learning_rate * np.mean(hidden_delta2, axis=0 ) hidden_weights1 += learning_rate * np.dot(hidden_delta1.T, inputs) hidden_bias1 += learning_rate * np.mean(hidden_delta1, axis=0 ) if epoch % 1000 == 0 : print (f"Epoch {epoch} , Loss: {loss} " )print (f"最终预测输出: {predicted_output} " )
代码解释:
扩展数据集 :使用多个样本。
增加隐藏层 :增加一个额外的隐藏层
hidden_layer2
。
批量梯度下降 :在每个 epoch 中更新权重和偏置。
输出预测结果 :打印损失并显示最终预测输出。
练习:
请在你的 Python 环境中运行上述代码,并观察输出结果。
尝试调整网络结构(如增加神经元数量或隐藏层),观察对训练过程的影响。
了解批量大小(batch size)和 epoch 数对训练的影响。
常见问题:
如何避免过拟合?
使用正则化技术(如 L2 正则化)。
使用 Dropout 技术。
增加数据集。
如何选择学习率?
学习率过高会导致训练不稳定,过低会导致训练过慢。可以使用学习率调度器逐步调整。
True Outputs 超过 1 的情况
1. 使用 Sigmoid
作为输出层激活函数
Sigmoid 函数 的输出范围是 0 到 1。如果
true_outputs
超过 1,会导致以下问题:
不匹配的输出范围 :Sigmoid 函数的输出永远不会超过
1,因此 true_outputs
超过 1
会导致较大的误差,可能影响训练效果。
损失函数计算问题 :如果使用均方误差 (MSE)
作为损失函数,过大的误差会导致梯度过大,可能使训练不稳定。
解决方法:
归一化输出 :将 true_outputs
归一化到 0
到 1 范围内,使之与 Sigmoid 函数的输出匹配。
使用不同的激活函数 :如果输出不应该被限制在 0 到 1
范围,可以考虑其他激活函数,如线性激活函数(即没有激活函数)。
修改代码以处理 True Outputs
超过 1
方法 1:归一化 True Outputs
1 2 3 4 5 true_outputs = np.array([[8 ], [4 ], [6 ]]) true_outputs = true_outputs / 10.0
方法 2:使用线性激活函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def linear (x ): return xdef linear_derivative (x ): return 1 hidden_output2 = sigmoid(hidden_input2) output_input = np.dot(hidden_output2, output_weights.T) + output_bias predicted_output = linear(output_input) output_error = true_outputs - predicted_output output_delta = output_error * linear_derivative(predicted_output)
示例代码:使用线性激活函数的完整示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 import numpy as npdef sigmoid (x ): return 1 / (1 + np.exp(-x))def sigmoid_derivative (x ): return x * (1 - x)def linear (x ): return xdef linear_derivative (x ): return 1 inputs = np.array([[1 , 2 ], [0.5 , 0.3 ], [0.2 , 0.8 ]]) true_outputs = np.array([[8 ], [4 ], [6 ]]) input_size = 2 hidden_size1 = 3 hidden_size2 = 2 output_size = 1 hidden_weights1 = np.random.rand(hidden_size1, input_size) hidden_bias1 = np.random.rand(hidden_size1) hidden_weights2 = np.random.rand(hidden_size2, hidden_size1) hidden_bias2 = np.random.rand(hidden_size2) output_weights = np.random.rand(output_size, hidden_size2) output_bias = np.random.rand(output_size) learning_rate = 0.5 epochs = 10000 for epoch in range (epochs): hidden_input1 = np.dot(inputs, hidden_weights1.T) + hidden_bias1 hidden_output1 = sigmoid(hidden_input1) hidden_input2 = np.dot(hidden_output1, hidden_weights2.T) + hidden_bias2 hidden_output2 = sigmoid(hidden_input2) output_input = np.dot(hidden_output2, output_weights.T) + output_bias predicted_output = linear(output_input) loss = np.mean((true_outputs - predicted_output) ** 2 ) output_error = true_outputs - predicted_output output_delta = output_error * linear_derivative(predicted_output) hidden_error2 = np.dot(output_delta, output_weights) hidden_delta2 = hidden_error2 * sigmoid_derivative(hidden_output2) hidden_error1 = np.dot(hidden_delta2, hidden_weights2) hidden_delta1 = hidden_error1 * sigmoid_derivative(hidden_output1) output_weights += learning_rate * np.dot(output_delta.T, hidden_output2) output_bias += learning_rate * np.mean(output_delta, axis=0 ) hidden_weights2 += learning_rate * np.dot(hidden_delta2.T, hidden_output1) hidden_bias2 += learning_rate * np.mean(hidden_delta2, axis=0 ) hidden_weights1 += learning_rate * np.dot(hidden_delta1.T, inputs) hidden_bias1 += learning_rate * np.mean(hidden_delta1, axis=0 ) if epoch % 1000 == 0 : print (f"Epoch {epoch} , Loss: {loss} " )print (f"最终预测输出: {predicted_output} " )
总结
归一化 true_outputs
是一种简单的处理方法,可以让你的网络输出与 true_outputs
匹配。
使用线性激活函数 可以适应范围超过 0-1 的
true_outputs
,适合于回归问题等情况。
你可以根据需要选择合适的处理方法,或者结合两者,确保网络输出与目标范围匹配。如果有进一步的问题或需要更多解释,请随时告知!
各类神经网络在投资中的应用
在投资领域,神经网络和深度学习技术有许多应用,主要用于数据分析、市场预测、交易策略的自动化等。以下是各类神经网络在投资中的具体应用及适用情况:
高级优化算法
应用场景 :优化交易策略、调优模型参数。
适用情况 :适用于需要精确控制模型训练过程的场景,如量化交易策略优化。
深度学习框架
应用场景 :快速构建和部署复杂的模型,如股票价格预测、投资组合优化。
适用情况 :适用于需要构建大规模深度学习模型的任务。
卷积神经网络(CNN)
应用场景 :分析金融市场中的图像数据,如股票走势图、财经新闻图片。
适用情况 :适用于从图像数据中提取信息,例如技术图表分析,但在投资中应用较少,除非涉及图像或图表识别。
递归神经网络(RNN)
应用场景 :时间序列预测,如股票价格走势预测、经济指标分析。
适用情况 :非常适合处理金融时间序列数据,能捕捉长期依赖关系。LSTM
和 GRU 是常用的变体。
正则化技术
应用场景 :防止模型在训练数据上过拟合,提高模型的泛化能力。
适用情况 :适用于所有神经网络模型,以提高其稳定性和性能。
超参数调优
应用场景 :自动寻找最佳模型参数组合,如调优学习率、层数、神经元数量。
适用情况 :适用于需要自动化调优过程的场景。
迁移学习
应用场景 :利用预训练模型在金融数据上微调,如利用新闻分类模型来分析财经新闻对市场的影响。
适用情况 :适用于有相关预训练模型且新任务数据有限的情况。
生成对抗网络(GAN)
应用场景 :生成虚拟的市场数据用于模拟或测试交易策略。
适用情况 :适用于生成新的数据集或增强数据集,但在金融领域应用较少。
最适合投资的神经网络类型
递归神经网络(RNN) 及其变体(如 LSTM 和
GRU)是投资领域最常用的神经网络类型,特别适合时间序列预测。这类模型能够处理过去的金融数据并预测未来的市场行为。
深度学习框架 (如 TensorFlow 和
PyTorch)能够帮助你快速构建和部署这些复杂的模型。此外,结合高级优化算法 和正则化技术 ,可以进一步提高模型的性能和稳定性。
实现步骤
使用 RNN/LSTM
进行股票价格预测
数据准备
收集并预处理时间序列数据,如股票价格、交易量等。
常见数据源包括 Yahoo Finance、Alpha Vantage 等。
模型构建
构建 RNN 或 LSTM 模型,设定输入层、隐藏层和输出层。
定义损失函数(如 MSE)和优化算法(如 Adam)。
模型训练
使用历史数据训练模型,使用批量梯度下降优化模型参数。
可以设置多次迭代(epochs)和适当的学习率。
模型评估
在测试数据集上评估模型性能,使用指标如 RMSE、MAE 等。
预测和应用
使用训练好的模型进行未来价格预测,结合投资策略进行应用。
代码示例
以下是使用 LSTM 进行股票价格预测的简化示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 import numpy as npimport pandas as pdfrom sklearn.preprocessing import MinMaxScalerdef load_stock_data (file_path ): data = pd.read_csv(file_path, parse_dates=['Date' ], index_col='Date' ) return datadef calculate_technical_indicators (data ): data['SMA' ] = data['close' ].rolling(window=10 ).mean() data['EMA' ] = data['close' ].ewm(span=10 , adjust=False ).mean() delta = data['close' ].diff() gain = (delta.where(delta > 0 , 0 )).rolling(window=14 ).mean() loss = (-delta.where(delta < 0 , 0 )).rolling(window=14 ).mean() rs = gain / loss data['RSI' ] = 100 - (100 / (1 + rs)) data['Bollinger Upper' ] = data['SMA' ] + 2 * \ data['close' ].rolling(window=20 ).std() data['Bollinger Lower' ] = data['SMA' ] - 2 * \ data['close' ].rolling(window=20 ).std() data = data.dropna() return datadef preprocess_data (data ): scaler = MinMaxScaler(feature_range=(0 , 1 )) scaled_data = scaler.fit_transform(data) return scaled_data, scalerdef create_dataset (data, seq_length ): X, y = [], [] for i in range (len (data) - seq_length - 1 ): X.append(data[i:(i + seq_length)]) y.append(data[i + seq_length, 3 ]) return np.array(X), np.array(y)class LSTMCell : def __init__ (self, input_size, hidden_size ): self .input_size = input_size self .hidden_size = hidden_size self .Wf = np.random.randn(hidden_size, input_size + hidden_size) * 0.01 self .Wi = np.random.randn(hidden_size, input_size + hidden_size) * 0.01 self .Wc = np.random.randn(hidden_size, input_size + hidden_size) * 0.01 self .Wo = np.random.randn(hidden_size, input_size + hidden_size) * 0.01 self .bf = np.zeros((hidden_size, 1 )) self .bi = np.zeros((hidden_size, 1 )) self .bc = np.zeros((hidden_size, 1 )) self .bo = np.zeros((hidden_size, 1 )) def forward (self, x, h_prev, c_prev ): combined = np.concatenate((x, h_prev), axis=0 ) ft = sigmoid(np.dot(self .Wf, combined) + self .bf) it = sigmoid(np.dot(self .Wi, combined) + self .bi) c_hat = np.tanh(np.dot(self .Wc, combined) + self .bc) c = ft * c_prev + it * c_hat ot = sigmoid(np.dot(self .Wo, combined) + self .bo) h = ot * np.tanh(c) return h, cdef sigmoid (x ): return 1 / (1 + np.exp(-x))def train_lstm (X, y, hidden_size, epochs, learning_rate ): input_size = X.shape[2 ] lstm_cell = LSTMCell(input_size, hidden_size) W_out = np.random.randn(1 , hidden_size) * 0.01 b_out = np.zeros((1 , 1 )) for epoch in range (epochs): total_loss = 0 for i in range (X.shape[0 ]): h_prev = np.zeros((hidden_size, 1 )) c_prev = np.zeros((hidden_size, 1 )) for t in range (X.shape[1 ]): x_t = X[i, t, :].reshape(-1 , 1 ) h_prev, c_prev = lstm_cell.forward(x_t, h_prev, c_prev) y_hat = np.dot(W_out, h_prev) + b_out loss = np.mean((y_hat - y[i]) ** 2 ) total_loss += loss dW_out = (y_hat - y[i]) * h_prev.T db_out = (y_hat - y[i]) W_out -= learning_rate * dW_out b_out -= learning_rate * db_out if epoch % 100 == 0 : print (f"Epoch {epoch} , Loss: {total_loss / X.shape[0 ]} " ) return lstm_cell, W_out, b_outdef predict_lstm (X, lstm_cell, W_out, b_out ): h_prev = np.zeros((lstm_cell.hidden_size, 1 )) c_prev = np.zeros((lstm_cell.hidden_size, 1 )) for t in range (X.shape[1 ]): x_t = X[t, :].reshape(-1 , 1 ) h_prev, c_prev = lstm_cell.forward(x_t, h_prev, c_prev) y_hat = np.dot(W_out, h_prev) + b_out return y_hat file_path = 'data_600519.csv' data = load_stock_data(file_path) data = calculate_technical_indicators(data) scaled_data, scaler = preprocess_data( data[['open' , 'high' , 'low' , 'close' , 'volume' , 'SMA' , 'EMA' , 'RSI' , 'Bollinger Upper' , 'Bollinger Lower' ]]) seq_length = 10 X, y = create_dataset(scaled_data, seq_length) hidden_size = 10 epochs = 500 learning_rate = 0.5 lstm_cell, W_out, b_out = train_lstm(X, y, hidden_size, epochs, learning_rate) X_test, y_test = create_dataset(scaled_data[-20 :], seq_length) predicted = [predict_lstm(X_test[i], lstm_cell, W_out, b_out) for i in range (len (X_test))] predicted = np.array(predicted).reshape(-1 , 1 ) num_features = scaled_data.shape[1 ] predicted_full = np.zeros((predicted.shape[0 ], num_features)) predicted_full[:, 3 ] = predicted[:, 0 ] original_predicted_full = scaler.inverse_transform(predicted_full) original_predicted = original_predicted_full[:, 3 ] pingjun = np.mean(original_predicted)print (f"预测结果: {original_predicted} " )print (f"预测结果(平均值): {pingjun} " )
总结
RNN/LSTM
非常适合处理时间序列数据,用于投资领域的市场预测和策略制定。
你可以结合深度学习框架 和高级优化算法 来提高模型的性能和效率。
结合投资实际需求选择合适的模型和方法,逐步构建和优化你的投资策略。
代码逐行解释:
1 2 3 4 5 6 7 import numpy as npimport pandas as pdfrom sklearn.preprocessing import MinMaxScalerdef load_stock_data (file_path ): data = pd.read_csv(file_path, parse_dates=['Date' ], index_col='Date' ) return data
这段代码定义了一个名为 load_stock_data
的函数,用于加载和预处理股票数据。该函数接收一个参数
file_path
,即包含股票数据的CSV文件的路径。下面是逐行解释:
import numpy as np
:这行代码导入了NumPy库,并将其简称为np
。NumPy是Python中用于科学计算的一个基础库,提供了大量的数学函数操作以及高性能的多维数组对象。然而,在这段代码中,NumPy库实际上没有被直接使用,可能是为了其他部分的代码(未在此展示)准备的。
import pandas as pd
:这行代码导入了Pandas库,并将其简称为pd
。Pandas是Python中用于数据分析的一个强大库,提供了易于使用的数据结构和数据分析工具。这段代码主要使用了Pandas来处理CSV格式的股票数据。
from sklearn.preprocessing import MinMaxScaler
:这行代码从scikit-learn(一个流行的Python机器学习库)的预处理模块中导入了MinMaxScaler
类。MinMaxScaler
用于将特征缩放到给定的最小值和最大值之间(通常是0和1),这是数据预处理中常用的一种技术,可以帮助改善许多机器学习算法的性能。然而,在这段特定的代码中,MinMaxScaler
没有被直接使用,可能是为了后续的数据处理步骤预留的。
定义load_stock_data
函数:
def load_stock_data(file_path):
:定义了一个名为load_stock_data
的函数,它接受一个参数file_path
,即包含股票数据的CSV文件的路径。
加载和预处理数据:
data = pd.read_csv(file_path, parse_dates=['Date'], index_col='Date')
:使用Pandas的read_csv
函数加载CSV文件。parse_dates=['Date']
参数告诉Pandas将Date
列解析为日期时间类型。index_col='Date'
参数指定将Date
列用作DataFrame的行索引。这意味着加载后的DataFrame将使用日期时间作为索引,而不是默认的整数索引。
返回处理后的数据:
return data
:函数返回处理后的DataFrame,其中包含了按日期时间索引的股票数据。
综上所述,这段代码的主要作用是加载一个包含股票数据的CSV文件,将Date
列解析为日期时间类型,并将其作为行索引,然后返回这个处理后的DataFrame。虽然MinMaxScaler
被导入了,但在这段代码中并没有直接使用,可能是为了后续的数据标准化步骤准备的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def calculate_technical_indicators (data ): data['SMA' ] = data['close' ].rolling(window=10 ).mean() data['EMA' ] = data['close' ].ewm(span=10 , adjust=False ).mean() delta = data['close' ].diff() gain = (delta.where(delta > 0 , 0 )).rolling(window=14 ).mean() loss = (-delta.where(delta < 0 , 0 )).rolling(window=14 ).mean() rs = gain / loss data['RSI' ] = 100 - (100 / (1 + rs)) data['Bollinger Upper' ] = data['SMA' ] + 2 * \ data['close' ].rolling(window=20 ).std() data['Bollinger Lower' ] = data['SMA' ] - 2 * \ data['close' ].rolling(window=20 ).std() data = data.dropna() return data
这段代码定义了一个名为 calculate_technical_indicators
的函数,它接受一个包含股票价格数据的
DataFrame
(假设至少包含 close
列,即收盘价)作为输入,并计算并添加几个常见的技术指标到该
DataFrame
中,最后返回更新后的
DataFrame
。这些技术指标包括简单移动平均(SMA)、指数移动平均(EMA)、相对强弱指数(RSI)和布林带(Bollinger
Bands)。下面是对每个步骤的详细解释:
简单移动平均(SMA) :
使用 rolling(window=10).mean()
计算过去10天的收盘价的平均值,结果存储在 data['SMA']
中。
指数移动平均(EMA) :
使用 ewm(span=10, adjust=False).mean()
计算指数移动平均。这里的 span=10
大致相当于传统的平滑系数(alpha)的倒数,但 ewm
方法通过
span
参数自动计算平滑系数。adjust=False
表示不使用调整因子(即不将EMA的初始值设为第一个值)。结果存储在
data['EMA']
中。
相对强弱指数(RSI) :
首先计算收盘价的差异(delta
),即当天的收盘价与前一天的收盘价之差。
然后分别计算过去14天内所有上涨日(gain
)和下跌日(loss
)的平均值。上涨日定义为
delta > 0
的日子,下跌日定义为 delta < 0
的日子。注意,这里使用了 where
方法来将非上涨/下跌日的值设为0,以便计算平均值。
计算RS(相对强弱),即上涨平均除以下跌平均。
最后,使用RS值计算RSI(相对强弱指数),公式为
100 - (100 / (1 + rs))
。结果存储在 data['RSI']
中。
布林带(Bollinger Bands) :
计算过去20天的收盘价的标准差(std()
),并以此为基础计算布林带的上限和下限。
布林带上限是简单移动平均(SMA)加上两倍的标准差,存储在
data['Bollinger Upper']
中。
布林带下限是简单移动平均(SMA)减去两倍的标准差,存储在
data['Bollinger Lower']
中。
去除 NaN 值 :
使用 dropna()
方法去除所有包含NaN值的行。这是必要的,因为计算移动平均、标准差等统计量时,在数据集的开始部分会产生NaN值。
返回更新后的 DataFrame :
函数返回更新后包含新计算的技术指标的 DataFrame
。
这个函数是金融数据分析中常用的一个工具,可以帮助投资者和分析师更好地理解股票价格的动态和趋势。
1 2 3 4 5 def preprocess_data (data ): scaler = MinMaxScaler(feature_range=(0 , 1 )) scaled_data = scaler.fit_transform(data) return scaled_data, scaler
这个函数 preprocess_data
的目的是对给定的数据集
data
进行预处理,具体来说是进行特征缩放(Feature
Scaling),将数据的特征值缩放到一个指定的范围内,这里使用的是 0 到 1
的范围。这种预处理步骤对于许多机器学习算法来说是非常重要的,因为它可以帮助改善算法的收敛速度和性能。
让我们逐行解释这个函数:
scaler = MinMaxScaler(feature_range=(0, 1))
这行代码创建了一个 MinMaxScaler
对象,它是从
sklearn.preprocessing
模块中导入的。MinMaxScaler
将数据缩放到给定的最小值和最大值之间(这里是 0 和
1)。默认情况下,MinMaxScaler
会将数据缩放到 [0,
1],但通过设置 feature_range
参数,我们可以自定义这个范围。在这个例子中,我们显式地指定了
feature_range=(0, 1)
,尽管这是默认值。
scaled_data = scaler.fit_transform(data)
这行代码执行了两个步骤:首先,fit
方法计算了数据的最小值和最大值,这是进行缩放所必需的;然后,transform
方法使用这些计算出的最小值和最大值将原始数据 data
缩放到指定的范围内(即 0 到 1)。这两个步骤通过
fit_transform
方法合并为一个步骤,以提高效率。scaled_data
变量现在包含了缩放后的数据。
return scaled_data, scaler
最后,函数返回两个值:缩放后的数据 scaled_data
和
MinMaxScaler
对象 scaler
。返回
scaler
对象可能很有用,因为它允许我们在未来的数据点(比如测试集或新数据)上使用相同的缩放参数进行缩放,以确保数据的一致性和可比性。
总的来说,这个函数是一个用于数据预处理的实用工具,它通过特征缩放将数据集的特征值缩放到
0 到 1 的范围内,并返回缩放后的数据以及用于缩放的
MinMaxScaler
对象。
1 2 3 4 5 6 7 def create_dataset (data, seq_length ): X, y = [], [] for i in range (len (data) - seq_length - 1 ): X.append(data[i:(i + seq_length)]) y.append(data[i + seq_length, 3 ]) return np.array(X), np.array(y)
这个函数 create_dataset
的目的是从给定的数据
data
中创建一个用于监督学习(特别是时间序列预测)的数据集。它接收两个参数:data
和 seq_length
。data
是一个多维数组,其中包含了时间序列数据;seq_length
是一个整数,指定了每个输入序列的长度。函数返回两个数组:X
和 y
,其中 X
包含输入序列,y
包含每个输入序列对应的目标值(即下一个时间步的某个特定特征值)。
不过,需要注意的是,函数中存在一个潜在的错误或不一致之处,这取决于
data
的具体结构。下面是对函数行为的详细解释和潜在问题的说明:
初始化 X 和 y :函数开始时,通过空列表
X
和 y
来存储输入序列和目标值。
循环遍历数据 :函数通过一个循环遍历
data
,从索引 i
开始,直到
len(data) - seq_length - 1
。这是因为我们需要有足够的后续数据来作为目标值(即
y
),同时保持每个输入序列的长度为
seq_length
。
构建输入序列 X :在每次循环中,通过
data[i:(i + seq_length)]
从 data
中切取一个长度为 seq_length
的序列,并将其添加到
X
列表中。
构建目标值 y :这里有一个潜在的问题。代码
y.append(data[i + seq_length, 3])
试图从
data[i + seq_length]
中获取第四个元素(索引为 3,因为索引从
0 开始)。但是,这假设 data
是一个二维数组,并且每一行都至少有四个元素。如果 data
是一维数组或每行的元素数量少于四个,这将导致错误。此外,如果
data
是三维或更高维的,这种索引方式也是不正确的。
返回结果 :最后,函数将 X
和
y
列表转换为 NumPy 数组并返回它们。
修正建议 :
如果 data
是一维时间序列数据,并且你想要预测的是序列中下一个时间步的某个特定值(但不一定是第四个值),你应该首先确认这一点,并在代码中明确这一点。例如,如果你想要预测的是下一个时间步的值,那么你可以将
y.append(data[i + seq_length])
改为
y.append(data[i + seq_length])
(但这会假设你预测的是整个序列的下一个值,而不是某个特定特征)。
如果 data
是二维或多维的,并且你确实想要获取每个序列的下一个时间步的第四个元素作为目标值,你需要确保
data
的形状和结构符合这种索引方式。
如果 data
的形状和结构不确定,你可能需要添加一些检查来验证这些假设,或者在文档中清楚地说明函数的使用条件和要求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class LSTMCell : def __init__ (self, input_size, hidden_size ): self .input_size = input_size self .hidden_size = hidden_size self .Wf = np.random.randn(hidden_size, input_size + hidden_size) * 0.01 self .Wi = np.random.randn(hidden_size, input_size + hidden_size) * 0.01 self .Wc = np.random.randn(hidden_size, input_size + hidden_size) * 0.01 self .Wo = np.random.randn(hidden_size, input_size + hidden_size) * 0.01 self .bf = np.zeros((hidden_size, 1 )) self .bi = np.zeros((hidden_size, 1 )) self .bc = np.zeros((hidden_size, 1 )) self .bo = np.zeros((hidden_size, 1 )) def forward (self, x, h_prev, c_prev ): combined = np.concatenate((x, h_prev), axis=0 ) ft = sigmoid(np.dot(self .Wf, combined) + self .bf) it = sigmoid(np.dot(self .Wi, combined) + self .bi) c_hat = np.tanh(np.dot(self .Wc, combined) + self .bc) c = ft * c_prev + it * c_hat ot = sigmoid(np.dot(self .Wo, combined) + self .bo) h = ot * np.tanh(c) return h, cdef sigmoid (x ): return 1 / (1 + np.exp(-x))
您提供的 LSTMCell
类实现了一个基本的 LSTM (长短期记忆)
单元的前向传播过程。LSTM 是一种特殊的 RNN (循环神经网络)
单元,它通过引入“门”机制(遗忘门、输入门、输出门)来解决传统 RNN
在处理长序列时容易出现的梯度消失或梯度爆炸问题。下面是对您实现的详细解释和可能需要注意的地方:
初始化
在 __init__
方法中,您为 LSTM
单元的权重和偏置项进行了随机初始化,并将它们乘以一个小的系数(这里是
0.01)来减少初始化时的随机性对训练的影响。这是深度学习中的常见做法,称为权重初始化。
前向传播
在 forward
方法中,您实现了 LSTM
的前向传播过程,这包括计算遗忘门 ft
、输入门
it
、候选单元状态 c_hat
、更新后的单元状态
c
和输出门 ot
,最后输出隐藏状态
h
。
注意事项
维度匹配 :在
np.concatenate((x, h_prev), axis=0)
中,您沿着第一个轴(axis=0)拼接了输入 x
和前一个隐藏状态
h_prev
。然而,在 LSTM 中,通常 x
和
h_prev
的维度应该是 (input_size,)
和
(hidden_size,)
,并且它们应该沿着特征维度(即第二维,axis=1)进行拼接。因此,正确的拼接方式应该是
np.concatenate((x.reshape(-1, 1), h_prev.reshape(-1, 1)), axis=1)
,但更常见的是直接使用
np.hstack((x, h_prev))
或者假设 x
和
h_prev
已经是二维的,并直接拼接
np.concatenate((x, h_prev), axis=1)
。
广播机制 :在
ft * c_prev + it * c_hat
和 ot * np.tanh(c)
中,NumPy
的广播机制会自动处理不同形状的数组相乘。但是,为了避免潜在的维度问题,确保
ft
, it
, c_prev
,
c_hat
, ot
, 和 c
的形状符合预期是很重要的。
性能优化 :虽然您的实现是教育和研究目的的好例子,但在实际应用中,使用专门的深度学习库(如
TensorFlow 或 PyTorch)会更高效,因为这些库经过优化,可以在 GPU
上并行处理,并提供自动微分等功能。
示例修正
假设 x
和 h_prev
都是一维数组(或需要被转换为二维数组以便拼接),这里是一个简化的修正版本:
1 2 3 4 5 6 7 8 9 def forward (self, x, h_prev, c_prev ): x = x.reshape(-1 , 1 ) h_prev = h_prev.reshape(-1 , 1 ) combined = np.hstack((x, h_prev)) return h, c
注意,这里假设 x
和 h_prev
都是标量或一维数组,并通过 .reshape(-1, 1)
转换为二维数组,然后使用 np.hstack
沿着第二个轴(特征轴)进行拼接。如果它们已经是二维的,则可以直接使用
np.concatenate
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 def train_lstm (X, y, hidden_size, epochs, learning_rate ): input_size = X.shape[2 ] lstm_cell = LSTMCell(input_size, hidden_size) W_out = np.random.randn(1 , hidden_size) * 0.01 b_out = np.zeros((1 , 1 )) for epoch in range (epochs): total_loss = 0 for i in range (X.shape[0 ]): h_prev = np.zeros((hidden_size, 1 )) c_prev = np.zeros((hidden_size, 1 )) for t in range (X.shape[1 ]): x_t = X[i, t, :].reshape(-1 , 1 ) h_prev, c_prev = lstm_cell.forward(x_t, h_prev, c_prev) y_hat = np.dot(W_out, h_prev) + b_out loss = np.mean((y_hat - y[i]) ** 2 ) total_loss += loss dW_out = (y_hat - y[i]) * h_prev.T db_out = (y_hat - y[i]) W_out -= learning_rate * dW_out b_out -= learning_rate * db_out if epoch % 100 == 0 : print (f"Epoch {epoch} , Loss: {total_loss / X.shape[0 ]} " ) return lstm_cell, W_out, b_out
您提供的 train_lstm
函数实现了使用单个 LSTM
单元进行序列数据训练的基本框架,但是有几个关键的问题和遗漏点需要注意和修正:
反向传播不完整 :
您的代码只实现了输出层的权重和偏置的更新,但没有实现 LSTM
单元内部权重(Wf
, Wi
, Wc
,
Wo
和对应的偏置 bf
, bi
,
bc
, bo
)的更新。这意味着 LSTM
单元的参数在训练过程中保持不变,这显然是不正确的。
损失函数和梯度计算 :
您的损失函数是均方误差(MSE),这是合理的。但是,梯度计算
dW_out
和 db_out
的方式在批量处理时可能不够准确,因为它们只考虑了最后一个时间步的隐藏状态。在序列预测中,通常需要考虑整个序列的损失。
LSTM 单元内部更新 : 为了更新 LSTM
单元内部的权重,您需要实现 LSTM 的反向传播算法(也称为通过时间反向传播
BPTT)。这涉及到计算每个时间步的误差项,并将它们反向传播回 LSTM
单元以更新权重。
学习率和优化器 :
您使用了简单的梯度下降(SGD)来更新参数,这是可行的,但在实际应用中,更复杂的优化器(如
Adam、RMSprop 等)可能更有效。
序列长度和批量处理 :
您的代码目前是按单个序列进行训练的,没有实现批量处理。在实际应用中,为了提高效率,通常会同时处理多个序列。
由于实现 LSTM
的完整反向传播算法相当复杂,这里不会给出完整的代码,但我可以提供一些指导思路:
实现 LSTM
的反向传播 :您需要计算每个时间步的误差项,并将其反向传播回 LSTM
单元。这涉及到计算关于遗忘门、输入门、候选单元状态和输出门的梯度。
更新 LSTM 权重 :使用反向传播得到的梯度来更新 LSTM
单元的权重和偏置。
考虑序列和批量处理 :在循环中同时迭代多个序列和每个序列的多个时间步。
如果您正在学习 LSTM 并希望逐步构建理解,我建议从更简单的 RNN
单元开始,并逐步增加复杂性,直到您能够处理 LSTM。同时,使用像 TensorFlow
或 PyTorch
这样的深度学习框架可以大大简化这些过程,因为它们已经为您处理了大部分底层细节。
对于您当前的项目,如果您只是想快速了解 LSTM
的基本工作原理,并且不打算深入实现其反向传播算法,您可以使用这些框架来构建和训练
LSTM 模型。
1 2 3 4 5 6 7 8 9 def predict_lstm (X, lstm_cell, W_out, b_out ): h_prev = np.zeros((lstm_cell.hidden_size, 1 )) c_prev = np.zeros((lstm_cell.hidden_size, 1 )) for t in range (X.shape[1 ]): x_t = X[t, :].reshape(-1 , 1 ) h_prev, c_prev = lstm_cell.forward(x_t, h_prev, c_prev) y_hat = np.dot(W_out, h_prev) + b_out return y_hat
您提供的 predict_lstm
函数实现了使用训练好的 LSTM
单元进行序列预测的过程。这个函数接收输入序列 X
、训练好的
LSTM 单元 lstm_cell
、输出层的权重 W_out
和偏置
b_out
,并返回预测的输出
y_hat
。不过,有几个小地方可能需要调整或注意:
输入序列的形状 : 在您的函数中,您假设了
X
的形状是
(time_steps, input_size)
,即它是一个二维数组,其中第一维是时间步长,第二维是每个时间步的输入特征数量。这通常是合理的,但请确保这与您训练
LSTM 时使用的输入序列形状相匹配。
输出形状 : y_hat
的形状将是
(1, 1)
,因为它是一个单个预测值(假设 W_out
的形状是
(1, hidden_size)
)。如果您的任务是多类别分类或需要多个输出值,您需要相应地调整
W_out
的形状。
广播机制 : 在
np.dot(W_out, h_prev) + b_out
中,NumPy 的广播机制将确保
b_out
(形状 (1, 1)
)被正确地添加到
np.dot(W_out, h_prev)
的结果中。但是,请确保
W_out
和 h_prev
的形状是兼容的。
错误处理 :
您的函数没有错误处理机制。在实际应用中,添加一些基本的错误检查(如检查输入形状、权重和偏置的形状等)是一个好习惯。
批量预测 :
当前函数只能对单个序列进行预测。如果您需要同时预测多个序列,您需要将这个函数包装在一个循环或函数中,以便能够迭代多个输入序列。
函数注释 :
添加函数注释可以帮助其他开发者(或未来的您)更好地理解函数的用途、参数和返回值。
这里是一个添加了简单注释的 predict_lstm
函数示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def predict_lstm (X, lstm_cell, W_out, b_out ): """ 使用训练好的 LSTM 单元进行序列预测。 参数: - X: 输入序列,形状为 (time_steps, input_size) - lstm_cell: 训练好的 LSTMCell 实例 - W_out: 输出层的权重矩阵,形状为 (num_outputs, hidden_size) - b_out: 输出层的偏置向量,形状为 (num_outputs, 1) 返回: - y_hat: 预测的输出值,形状为 (num_outputs, 1) """ h_prev = np.zeros((lstm_cell.hidden_size, 1 )) c_prev = np.zeros((lstm_cell.hidden_size, 1 )) for t in range (X.shape[0 ]): x_t = X[t, :].reshape(-1 , 1 ) h_prev, c_prev = lstm_cell.forward(x_t, h_prev, c_prev) y_hat = np.dot(W_out, h_prev) + b_out return y_hat
请注意,我在遍历时间步时使用了 X.shape[0]
而不是
X.shape[1]
,因为 X
的形状是
(time_steps, input_size)
,所以时间步数应该是第一维。如果您的
X
形状实际上是
(batch_size, time_steps, input_size)
(即包含批量处理),则您需要进一步修改这个函数以处理批量数据。
1 2 3 4 5 6 7 8 file_path = 'data_600519.csv' data = load_stock_data(file_path) data = calculate_technical_indicators(data) scaled_data, scaler = preprocess_data( data[['open' , 'high' , 'low' , 'close' , 'volume' , 'SMA' , 'EMA' , 'RSI' , 'Bollinger Upper' , 'Bollinger Lower' ]]) seq_length = 10 X, y = create_dataset(scaled_data, seq_length)
下面是对您提供的代码段进行逐行解释,包括注释:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 file_path = 'data_600519.csv' data = load_stock_data(file_path) data = calculate_technical_indicators(data) scaled_data, scaler = preprocess_data( data[['open' , 'high' , 'low' , 'close' , 'volume' , 'SMA' , 'EMA' , 'RSI' , 'Bollinger Upper' , 'Bollinger Lower' ]] ) seq_length = 10 X, y = create_dataset(scaled_data, seq_length)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 hidden_size = 10 epochs = 500 learning_rate = 0.5 lstm_cell, W_out, b_out = train_lstm(X, y, hidden_size, epochs, learning_rate) X_test, y_test = create_dataset(scaled_data[-20 :], seq_length) predicted = [predict_lstm(X_test[i], lstm_cell, W_out, b_out) for i in range (len (X_test))] predicted = np.array(predicted).reshape(-1 , 1 ) num_features = scaled_data.shape[1 ] predicted_full = np.zeros((predicted.shape[0 ], num_features)) predicted_full[:, 3 ] = predicted[:, 0 ] original_predicted_full = scaler.inverse_transform(predicted_full) original_predicted = original_predicted_full[:, 3 ] pingjun = np.mean(original_predicted)print (f"预测结果: {original_predicted} " )print (f"预测结果(平均值): {pingjun} " )
在您提供的代码段中,您首先训练了一个LSTM模型,并使用该模型进行了预测。然而,有几个地方需要注意或可能需要修改,以确保代码能够正确运行并产生预期的结果。以下是对代码段的详细解释和可能的修改建议:
训练 LSTM : 您假设有一个名为
train_lstm
的函数,它接收输入数据 X
、目标数据
y
、隐藏层大小 hidden_size
、训练轮次
epochs
和学习率 learning_rate
,并返回训练好的
LSTM 单元、输出层权重 W_out
和偏置
b_out
。这个函数的具体实现不在您提供的代码段中,但它是关键部分,需要确保它正确实现了
LSTM 的训练过程。
预测 :
您使用 create_dataset
函数从 scaled_data
的最后 20 个时间点中创建了一个测试数据集 X_test
和
y_test
。请注意,由于您正在使用 LSTM
进行预测,因此实际上可能不需要
y_test
(除非您打算在测试集上评估模型性能)。
您定义了一个 predict_lstm
函数(尽管它的实现细节不在这里),该函数应该接收单个时间步的输入序列
X_test[i]
、LSTM
单元、输出层权重和偏置,并返回预测结果。然而,在 LSTM
的上下文中,通常一次预测多个时间步(使用滑动窗口或类似方法),但这里您似乎是对每个时间步单独进行预测。
您将预测结果收集到一个列表中,并将其转换为 NumPy
数组。然后,您创建了一个与原始数据特征数量相同大小的零数组
predicted_full
,并将预测结果(假设是收盘价)放入该数组的适当列中。
逆变换和结果提取 :
您使用 scaler.inverse_transform
方法将缩放后的预测数据转换回原始比例。这是必要的,因为模型是在缩放后的数据上训练的。
您从转换后的数据中提取了“Close”列的预测值。
可能的修改 :
确保 predict_lstm
函数能够处理单个时间步的输入,并返回正确的预测值。如果 LSTM
单元设计为处理多个时间步,则可能需要修改该函数以适应这一点。
如果您的目标是进行多步预测(即预测未来多个时间点的收盘价),则可能需要修改预测循环以生成多个时间步的预测。
验证 scaler
是否确实包含了用于逆变换所需的所有信息(例如,均值和标准差)。
输出结果 :
您打印了预测结果及其平均值。这可以帮助您了解模型的预测性能。
注意 :
LSTM
通常用于处理序列数据,并且一次可以处理多个时间步。在您的代码中,您似乎对每个时间步单独进行了预测,这在实际应用中可能不是最优的。
确保 scaler
和其他预处理步骤(如特征选择)在训练和测试数据上保持一致。
以下是一个简化的预测步骤示例,假设 predict_lstm
函数已经能够处理整个序列(而不仅仅是单个时间步):
1 2 3 4 5 6 7 8 9 10 11 predicted_full = predict_lstm(X_test, lstm_cell, W_out, b_out) original_predicted_full = scaler.inverse_transform(predicted_full) original_predicted = original_predicted_full[:, 3 ] pingjun = np.mean(original_predicted)print (f"预测结果: {original_predicted} " )print (f"预测结果(平均值): {pingjun} " )
但是,请注意,这个简化的示例假设 predict_lstm
已经被相应地修改以处理整个序列。如果它仍然只能处理单个时间步,则您可能需要使用循环或其他方法来生成整个序列的预测。
用于抓取股价的python
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 import jsonimport requestsimport datetimeimport pandas as pd def get_price_day_tx (code, end_date='' , count=10 , frequency='1d' ): unit = 'week' if frequency in '1w' else 'month' if frequency in '1M' else 'day' if end_date: end_date = end_date.strftime( '%Y-%m-%d' ) if isinstance (end_date, datetime.date) else end_date.split(' ' )[0 ] end_date = '' if end_date == datetime.datetime.now().strftime( '%Y-%m-%d' ) else end_date URL = f'http://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={ code} ,{unit} ,,{end_date} ,{count} ,qfq' st = json.loads(requests.get(URL).content) ms = 'qfq' +unit stk = st['data' ][code] buf = stk[ms] if ms in stk else stk[unit] df = pd.DataFrame( buf, columns=['time' , 'open' , 'close' , 'high' , 'low' , 'volume' ], dtype='float' ) df.time = pd.to_datetime(df.time) df.set_index(['time' ], inplace=True ) df.index.name = '' return dfdef get_price_min_tx (code, end_date=None , count=10 , frequency='1d' ): ts = int (frequency[:-1 ]) if frequency[:-1 ].isdigit() else 1 if end_date: end_date = end_date.strftime( '%Y-%m-%d' ) if isinstance (end_date, datetime.date) else end_date.split(' ' )[0 ] URL = f'http://ifzq.gtimg.cn/appstock/app/kline/mkline?param={ code} ,m{ts} ,,{count} ' st = json.loads(requests.get(URL).content) buf = st['data' ][code]['m' +str (ts)] df = pd.DataFrame( buf, columns=['time' , 'open' , 'close' , 'high' , 'low' , 'volume' , 'n1' , 'n2' ]) df = df[['time' , 'open' , 'close' , 'high' , 'low' , 'volume' ]] df[['open' , 'close' , 'high' , 'low' , 'volume' ]] = df[[ 'open' , 'close' , 'high' , 'low' , 'volume' ]].astype('float' ) df.time = pd.to_datetime(df.time) df.set_index(['time' ], inplace=True ) df.index.name = '' df['close' ][-1 ] = float (st['data' ][code]['qt' ][code][3 ]) return dfdef get_price_sina (code, end_date='' , count=10 , frequency='60m' ): frequency = frequency.replace('1d' , '240m' ).replace( '1w' , '1200m' ).replace('1M' , '7200m' ) mcount = count ts = int (frequency[:-1 ]) if frequency[:-1 ].isdigit() else 1 if (end_date != '' ) & (frequency in ['240m' , '1200m' , '7200m' ]): end_date = pd.to_datetime(end_date) if not isinstance ( end_date, datetime.date) else end_date unit = 4 if frequency == '1200m' else 29 if frequency == '7200m' else 1 count = count+(datetime.datetime.now()-end_date).days//unit URL = f'http://money.finance.sina.com.cn/quotes_service/api/json_v2.php/CN_MarketData.getKLineData?symbol={ code} &scale={ts} &ma=5&datalen={count} ' dstr = json.loads(requests.get(URL).content) df = pd.DataFrame( dstr, columns=['day' , 'open' , 'high' , 'low' , 'close' , 'volume' ]) df['open' ] = df['open' ].astype(float ) df['high' ] = df['high' ].astype(float ) df['low' ] = df['low' ].astype(float ) df['close' ] = df['close' ].astype(float ) df['volume' ] = df['volume' ].astype(float ) df.day = pd.to_datetime(df.day) df.set_index(['day' ], inplace=True ) df.index.name = '' if (end_date != '' ) & (frequency in ['240m' , '1200m' , '7200m' ]): return df[df.index <= end_date][-mcount:] return dfdef get_price (code, end_date='' , count=10 , frequency='1d' , fields=[] ): xcode = code.replace('.XSHG' , '' ).replace('.XSHE' , '' ) xcode = 'sh' +xcode if ('XSHG' in code) else 'sz' + \ xcode if ('XSHE' in code) else code if frequency in ['1d' , '1w' , '1M' ]: try : return get_price_sina(xcode, end_date=end_date, count=count, frequency=frequency) except : return get_price_day_tx(xcode, end_date=end_date, count=count, frequency=frequency) if frequency in ['1m' , '5m' , '15m' , '30m' , '60m' ]: if frequency in '1m' : return get_price_min_tx(xcode, end_date=end_date, count=count, frequency=frequency) try : return get_price_sina(xcode, end_date=end_date, count=count, frequency=frequency) except : return get_price_min_tx(xcode, end_date=end_date, count=count, frequency=frequency)if __name__ == '__main__' : df = get_price('sh000001' , frequency='1d' , count=10 ) print ('上证指数日线行情\n' , df) df = get_price('000001.XSHG' , frequency='15m' , count=10 ) print ('上证指数分钟线\n' , df)"""获取股票历史价格""" import Ashare gupiao = input ('输入代码:' )if gupiao[0 ] == '6' : df = Ashare.get_price(f'sh{gupiao} ' , frequency='1d' , count=100 )else : df = Ashare.get_price(f'sz{gupiao} ' , frequency='1d' , count=100 )print ('上证指数历史行情\n' , df) output_File = f'{gupiao} ' df.to_csv(f'data_{output_File} .csv' )print (f"数据已经存储到data_{output_File} .csv" )